Skip to content

Commit

Permalink
將master设为job,slaveContainer设为taskGroup,slave改为task
Browse files Browse the repository at this point in the history
  • Loading branch information
静行 authored and chinan.hjc committed Dec 4, 2014
1 parent 9867dca commit 7540a94
Showing 92 changed files with 1,299 additions and 1,299 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.alibaba.datax.common.plugin;

/**
* Created by jingxing on 14-8-24.
*/
public abstract class AbstractJobPlugin extends AbstractPlugin {
/**
* @return the jobPluginCollector
*/
public JobPluginCollector getJobPluginCollector() {
return jobPluginCollector;
}

/**
* @param jobPluginCollector
* the jobPluginCollector to set
*/
public void setJobPluginCollector(
JobPluginCollector jobPluginCollector) {
this.jobPluginCollector = jobPluginCollector;
}

private JobPluginCollector jobPluginCollector;

}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.alibaba.datax.common.plugin;

/**
* Created by jingxing on 14-8-24.
*/
public abstract class AbstractTaskPlugin extends AbstractPlugin {
/**
* @return the taskPluginCollector
*/
public TaskPluginCollector getTaskPluginCollector() {
return taskPluginCollector;
}

/**
* @param taskPluginCollector
* the taskPluginCollector to set
*/
public void setTaskPluginCollector(
TaskPluginCollector taskPluginCollector) {
this.taskPluginCollector = taskPluginCollector;
}

private TaskPluginCollector taskPluginCollector;

}
Original file line number Diff line number Diff line change
@@ -6,16 +6,16 @@
/**
* Created by jingxing on 14-9-9.
*/
public interface MasterPluginCollector extends PluginCollector {
public interface JobPluginCollector extends PluginCollector {

/**
* 从Slave获取自定义收集信息
* 从Task获取自定义收集信息
*
* */
Map<String, List<String>> getMessage();

/**
* 从Slave获取自定义收集信息
* 从Task获取自定义收集信息
*
* */
List<String> getMessage(String key);
Original file line number Diff line number Diff line change
@@ -4,13 +4,13 @@

/**
*
* 该接口提供给Slave Plugin用来记录脏数据和自定义信息。 <br >
* 该接口提供给Task Plugin用来记录脏数据和自定义信息。 <br >
*
* 1. 脏数据记录,SlavePluginCollector提供多种脏数据记录的适配,包括本地输出、集中式汇报等等<br >
* 2. 自定义信息,所有的Slave插件运行过程中可以通过SlavePluginCollector收集信息, <br >
* Master的插件在POST过程中通过getMessage()接口获取信息
* 1. 脏数据记录,TaskPluginCollector提供多种脏数据记录的适配,包括本地输出、集中式汇报等等<br >
* 2. 自定义信息,所有的task插件运行过程中可以通过TaskPluginCollector收集信息, <br >
* Job的插件在POST过程中通过getMessage()接口获取信息
*/
public abstract class SlavePluginCollector implements PluginCollector {
public abstract class TaskPluginCollector implements PluginCollector {
/**
* 收集脏数据
*
@@ -50,7 +50,7 @@ public void collectDirtyRecord(final Record dirtyRecord, final Throwable t) {
}

/**
* 收集自定义信息,Master插件可以通过getMessage获取该信息 <br >
* 收集自定义信息,Job插件可以通过getMessage获取该信息 <br >
* 如果多个key冲突,内部使用List记录同一个key,多个value情况。<br >
* */
public abstract void collectMessage(final String key, final String value);
12 changes: 6 additions & 6 deletions common/src/main/java/com/alibaba/datax/common/spi/Reader.java
Original file line number Diff line number Diff line change
@@ -3,23 +3,23 @@
import java.util.List;

import com.alibaba.datax.common.base.BaseObject;
import com.alibaba.datax.common.plugin.AbstractMasterPlugin;
import com.alibaba.datax.common.plugin.AbstractSlavePlugin;
import com.alibaba.datax.common.plugin.AbstractJobPlugin;
import com.alibaba.datax.common.plugin.AbstractTaskPlugin;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.common.plugin.RecordSender;

/**
* 每个Reader插件在其内部内部实现Master、Slave两个内部类
* 每个Reader插件在其内部内部实现Job、Task两个内部类
*
*
* */
public abstract class Reader extends BaseObject {

/**
* 每个Reader插件必须实现Master内部类
* 每个Reader插件必须实现Job内部类
*
* */
public static abstract class Master extends AbstractMasterPlugin {
public static abstract class Job extends AbstractJobPlugin {

/**
* 切分任务
@@ -46,7 +46,7 @@ public static abstract class Master extends AbstractMasterPlugin {
public abstract List<Configuration> split(int adviceNumber);
}

public static abstract class Slave extends AbstractSlavePlugin {
public static abstract class Task extends AbstractTaskPlugin {
public abstract void startRead(RecordSender recordSender);
}
}
14 changes: 7 additions & 7 deletions common/src/main/java/com/alibaba/datax/common/spi/Writer.java
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
package com.alibaba.datax.common.spi;

import com.alibaba.datax.common.base.BaseObject;
import com.alibaba.datax.common.plugin.AbstractMasterPlugin;
import com.alibaba.datax.common.plugin.AbstractSlavePlugin;
import com.alibaba.datax.common.plugin.AbstractJobPlugin;
import com.alibaba.datax.common.plugin.AbstractTaskPlugin;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.common.plugin.RecordReceiver;

import java.util.List;

/**
* 每个Writer插件需要实现Writer类,并在其内部实现Master、Slave两个内部类
* 每个Writer插件需要实现Writer类,并在其内部实现Job、Task两个内部类
*
*
* */
public abstract class Writer extends BaseObject {
/**
* 每个Writer插件必须实现Master内部类
* 每个Writer插件必须实现Job内部类
*/
public abstract static class Master extends AbstractMasterPlugin {
public abstract static class Job extends AbstractJobPlugin {
/**
* 切分任务。<br>
*
@@ -29,9 +29,9 @@ public abstract static class Master extends AbstractMasterPlugin {
}

/**
* 每个Writer插件必须实现Slave内部类
* 每个Writer插件必须实现Task内部类
*/
public abstract static class Slave extends AbstractSlavePlugin {
public abstract static class Task extends AbstractTaskPlugin {
public abstract void startWrite(RecordReceiver lineReceiver);
}
}
Original file line number Diff line number Diff line change
@@ -19,10 +19,10 @@
* <p/>
* 实例代码:<br>
* <p/>
* 获取master的配置信息<br>
* 获取job的配置信息<br>
* Configuration configuration = Configuration.from(new File("Config.json")); <br>
* String masterContainerClass =
* configuration.getString("core.container.master.class"); <br>
* String jobContainerClass =
* configuration.getString("core.container.job.class"); <br>
* <p/>
* <br>
* 设置多级List <br>
14 changes: 7 additions & 7 deletions common/src/test/resources/all.json
Original file line number Diff line number Diff line change
@@ -31,23 +31,23 @@
}
},
"container": {
"master": {
"class": "com.alibaba.datax.core.container.MasterContainer",
"job": {
"class": "com.alibaba.datax.core.container.JobContainer",
"reportInterval": 1000
},
"slave": {
"class": "com.alibaba.datax.core.container.SlaveContainer",
"taskGroup": {
"class": "com.alibaba.datax.core.container.TaskGroupContainer",
"channel": 3
}
},
"statistics": {
"collector": {
"container": {
"masterClass": "com.alibaba.datax.core.statistics.collector.container.standalone.MasterContainerCollector",
"slaveClass": "com.alibaba.datax.core.statistics.collector.container.standalone.SlaveContainerCollector"
"jobClass": "com.alibaba.datax.core.statistics.collector.container.standalone.JobContainerCollector",
"taskGroupClass": "com.alibaba.datax.core.statistics.collector.container.standalone.TaskGroupContainerCollector"
},
"plugin": {
"slaveClass": "com.alibaba.datax.core.statistics.collector.plugin.slave.StdoutPluginCollector",
"taskClass": "com.alibaba.datax.core.statistics.collector.plugin.task.StdoutPluginCollector",
"maxDirtyNumber": 1000
}
}
10 changes: 5 additions & 5 deletions core/src/main/bin/datax.py
Original file line number Diff line number Diff line change
@@ -158,7 +158,7 @@ def get_json_job_path(job_path):
is_resaved_json = True

if is_job_from_http:
# 把masterId 和 reportAddress写入配置中
# 把jobId 和 reportAddress写入配置中
job_json_content = add_core_config_for_http(job_json_content, job_path)
if not job_json_content:
print >>sys.stderr, "add core config for http error"
@@ -169,16 +169,16 @@ def get_json_job_path(job_path):
return job_new_path, is_resaved_json

def add_core_config_for_http(job_json_content, job_path):
job_id = get_masterId_from_http(job_path)
job_id = get_jobId_from_http(job_path)
if job_id:
job_json_content = json.loads(job_json_content)
job_json_content["core"] = {"container":{"master":{"id":job_id}}}
job_json_content["core"] = {"container":{"job":{"id":job_id}}}
else:
return None

return json.dumps(job_json_content, sort_keys=True, indent=4)

def get_masterId_from_http(job_path):
def get_jobId_from_http(job_path):
m = re.match(r"^http[s]?://\S+/(\d+)\w*", job_path)
if m:
return m.group(1)
@@ -199,7 +199,7 @@ def save_to_tmp_file(job_path, is_job_from_http, job_json_content):

tmp_file_path = None
if is_job_from_http:
tmp_file_path = get_masterId_from_http(job_path)
tmp_file_path = get_jobId_from_http(job_path)
if not tmp_file_path:
sys.exit(RET_STATE["FAIL"])
else:
16 changes: 8 additions & 8 deletions core/src/main/conf/core.json
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@
}
},
"core": {
"clusterManager": {
"dataXService": {
"address": "http://localhost:7001/api",
"timeout": 3000
},
@@ -34,23 +34,23 @@
}
},
"container": {
"master": {
"class": "com.alibaba.datax.core.container.MasterContainer",
"job": {
"class": "com.alibaba.datax.core.container.JobContainer",
"reportInterval": 10000
},
"slave": {
"class": "com.alibaba.datax.core.container.SlaveContainer",
"taskGroup": {
"class": "com.alibaba.datax.core.container.TaskGroupContainer",
"channel": 5
}
},
"statistics": {
"collector": {
"container": {
"masterClass": "com.alibaba.datax.core.statistics.collector.container.standalone.MasterContainerCollector",
"slaveClass": "com.alibaba.datax.core.statistics.collector.container.standalone.SlaveContainerCollector"
"jobClass": "com.alibaba.datax.core.statistics.collector.container.standalone.JobContainerCollector",
"taskGroupClass": "com.alibaba.datax.core.statistics.collector.container.standalone.TaskGroupContainerCollector"
},
"plugin": {
"slaveClass": "com.alibaba.datax.core.statistics.collector.plugin.slave.StdoutPluginCollector",
"taskClass": "com.alibaba.datax.core.statistics.collector.plugin.task.StdoutPluginCollector",
"maxDirtyNumber": 1000
}
}
14 changes: 7 additions & 7 deletions core/src/main/java/com/alibaba/datax/core/Engine.java
Original file line number Diff line number Diff line change
@@ -21,12 +21,12 @@
import java.util.UUID;

/**
* Engine是DataX入口类,该类负责初始化Master或者Slave的运行容器,并运行插件的Master或者Slave逻辑
* Engine是DataX入口类,该类负责初始化Job或者Task的运行容器,并运行插件的Job或者Task逻辑
*/
public class Engine {
private static final Logger LOG = LoggerFactory.getLogger(Engine.class);

/* check job model (master/slave) first */
/* check job model (job/task) first */
public void start(Configuration allConf) {

// 绑定column转换信息
@@ -37,17 +37,17 @@ public void start(Configuration allConf) {
*/
LoadUtil.bind(allConf);

boolean isMaster = !("slaveContainer".equalsIgnoreCase(allConf
boolean isJob = !("taskGroup".equalsIgnoreCase(allConf
.getString(CoreConstant.DATAX_CORE_CONTAINER_MODEL)));

AbstractContainer container;
if (isMaster) {
if (isJob) {
container = ClassUtil.instantiate(allConf
.getString(CoreConstant.DATAX_CORE_CONTAINER_MASTER_CLASS),
.getString(CoreConstant.DATAX_CORE_CONTAINER_JOB_CLASS),
AbstractContainer.class, allConf);
} else {
container = ClassUtil.instantiate(allConf
.getString(CoreConstant.DATAX_CORE_CONTAINER_SLAVE_CLASS),
.getString(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CLASS),
AbstractContainer.class, allConf);
}

@@ -126,7 +126,7 @@ public static void entry(final String[] args) throws Throwable {
Configuration configuration = ConfigParser.parse(jobPath);

String jobId = configuration.getString(
CoreConstant.DATAX_CORE_CONTAINER_MASTER_ID,
CoreConstant.DATAX_CORE_CONTAINER_JOB_ID,
UUID.randomUUID().toString());
Engine.confLog(jobId);

Loading

0 comments on commit 7540a94

Please sign in to comment.