Skip to content

Commit

Permalink
mongo db sync plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
jianying.wcj committed Apr 7, 2015
1 parent dc069f2 commit 02534a7
Showing 1 changed file with 25 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.reader.mongodbreader.KeyConstant;
import com.alibaba.datax.plugin.reader.mongodbreader.MongoDBReaderErrorCode;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.mongodb.MongoClient;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
Expand All @@ -14,7 +12,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

/**
* Created by jianying.wcj on 2015/3/17 0017.
Expand All @@ -23,12 +20,12 @@ public class MongoUtil {

public static MongoClient initMongoClient(Configuration conf) {

String address = conf.getString(KeyConstant.MONGO_ADDRESS);
if(Strings.isNullOrEmpty(address)) {
List<Object> addressList = conf.getList(KeyConstant.MONGO_ADDRESS);
if(addressList == null || addressList.size() <= 0) {
throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,"不合法参数");
}
try {
return new MongoClient(parseServerAddress(address));
return new MongoClient(parseServerAddress(addressList));
} catch (UnknownHostException e) {
throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_ADDRESS,"不合法的地址");
} catch (NumberFormatException e) {
Expand All @@ -40,13 +37,13 @@ public static MongoClient initMongoClient(Configuration conf) {

public static MongoClient initCredentialMongoClient(Configuration conf,String userName,String password) {

String address = conf.getString(KeyConstant.MONGO_ADDRESS);
if(isHostPortPattern(address)) {
List<Object> addressList = conf.getList(KeyConstant.MONGO_ADDRESS);
if(!isHostPortPattern(addressList)) {
throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,"不合法参数");
}
try {
MongoCredential credential = MongoCredential.createMongoCRCredential(userName, "admin", password.toCharArray());
return new MongoClient(parseServerAddress(address), Arrays.asList(credential));
return new MongoClient(parseServerAddress(addressList), Arrays.asList(credential));

} catch (UnknownHostException e) {
throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_ADDRESS,"不合法的地址");
Expand All @@ -58,36 +55,45 @@ public static MongoClient initCredentialMongoClient(Configuration conf,String us
}
/**
* 判断地址类型是否符合要求
* @param addressListStr
* @param addressList
* @return
*/
private static boolean isHostPortPattern(String addressListStr) {
Iterable<String> ms = Splitter.on(",").split(addressListStr);
private static boolean isHostPortPattern(List<Object> addressList) {
boolean isMatch = false;
for(String address : ms) {
for(Object address : addressList) {
String regex = "([0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+):([0-9]+)";
if(address.matches(regex)) {
if(((String)address).matches(regex)) {
isMatch = true;
}
}
return isMatch;
}
/**
* 转换为mongo地址协议
* @param address
* @param rawAddressList
* @return
*/
private static List<ServerAddress> parseServerAddress(String address) throws UnknownHostException{
private static List<ServerAddress> parseServerAddress(List<Object> rawAddressList) throws UnknownHostException{
List<ServerAddress> addressList = new ArrayList<ServerAddress>();
Map<String,String> ms = Splitter.on(",").withKeyValueSeparator(":").split(address);
for(Map.Entry<String,String> temp : ms.entrySet()) {
for(Object address : rawAddressList) {
String[] tempAddress = ((String)address).split(":");
try {
ServerAddress sa = new ServerAddress(temp.getKey(),Integer.valueOf(temp.getValue()));
ServerAddress sa = new ServerAddress(tempAddress[0],Integer.valueOf(tempAddress[1]));
addressList.add(sa);
} catch (UnknownHostException e) {
throw new UnknownHostException();
}
}
return addressList;
}

public static void main(String[] args) {
try {
ArrayList hostAddress = new ArrayList();
hostAddress.add("127.0.0.1:27017");
System.out.println(MongoUtil.isHostPortPattern(hostAddress));
} catch (Exception e) {
e.printStackTrace();
}
}
}

0 comments on commit 02534a7

Please sign in to comment.