Skip to content

Commit

Permalink
[ZEPPELIN-5053]. Transfer Zeppelin server configuration to a remote i…
Browse files Browse the repository at this point in the history
…nterpreter

### What is this PR for?

This PR add method `init` to pass `ZeppelinConfiguration` in the thrift interface between zeppelin server and interpreter process. And put lifecycle manager initialization in the init method. Zeppelin server will call this init method after interpreter process is started (register with zeppelin server)

### What type of PR is it?
[Bug Fix ]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5053

### How should this be tested?
* https://travis-ci.org/github/zjffdu/zeppelin/builds/730513842

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? NO
* Is there breaking changes for older versions? NO
* Does this needs documentation? No

Author: Jeff Zhang <zjffdu@apache.org>

Closes apache#3917 from zjffdu/ZEPPELIN-5053 and squashes the following commits:

ff6dc73 [Jeff Zhang] set property if env is set
58400da [Jeff Zhang] [ZEPPELIN-5053]. Transfer Zeppelin server configuration to a remote interpreter
  • Loading branch information
zjffdu committed Oct 3, 2020
1 parent 2e69f71 commit 438f0b2
Show file tree
Hide file tree
Showing 44 changed files with 980 additions and 118 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -480,4 +480,4 @@ after_failure:
- cat livy/target/tmp/livy-int-test/MiniYarnMain/target/org.apache.livy.test.framework.MiniYarnMain/*/*/*/stdout
- cat livy/target/tmp/livy-int-test/MiniYarnMain/target/org.apache.livy.test.framework.MiniYarnMain/*/*/*/stderr
- cat zeppelin-zengine/target/org.apache.zeppelin.interpreter.MiniHadoopCluster/*/*/*/stdout
- cat logs/zeppelin-interpreter-flink-*.log
- cat logs/zeppelin-interpreter-*.log
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void start(Class clazz) throws IOException {
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(), confDir.getAbsolutePath());
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath());
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "120000");
conf = new ZeppelinConfiguration();
conf = ZeppelinConfiguration.create();
interpreterSettingManager = new InterpreterSettingManager(conf,
mock(AngularObjectRegistryListener.class), mock(RemoteInterpreterProcessListener.class), mock(ApplicationEventListener.class));
interpreterFactory = new InterpreterFactory(interpreterSettingManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class ClusterMonitor {
public ClusterMonitor(ClusterManager clusterManagerServer) {
this.clusterManager = clusterManagerServer;

ZeppelinConfiguration zconf = new ZeppelinConfiguration();
ZeppelinConfiguration zconf = ZeppelinConfiguration.create();
heartbeatInterval = zconf.getClusterHeartbeatInterval();
heartbeatTimeout = zconf.getClusterHeartbeatTimeout();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,22 @@ private void initProperties() {
for (ConfigurationNode p : nodes) {
String name = (String) p.getChildren("name").get(0).getValue();
String value = (String) p.getChildren("value").get(0).getValue();
if (!StringUtils.isEmpty(name)) {
if (StringUtils.isNotBlank(name) && StringUtils.isNotBlank(value)) {
properties.put(name, value);
}
}
}


public ZeppelinConfiguration() {
// private constructor, so that it is singleton.
private ZeppelinConfiguration() {
ConfVars[] vars = ConfVars.values();
for (ConfVars v : vars) {
if (v.getType() == ConfVars.VarType.BOOLEAN) {
// set property if env is set, so that the configuration can be passed to
// interpreter process properly.
if (StringUtils.isNotBlank(System.getenv(v.name()))) {
this.setProperty(v.getVarName(), System.getenv(v.name()));
} else if (v.getType() == ConfVars.VarType.BOOLEAN) {
this.setProperty(v.getVarName(), v.getBooleanValue());
} else if (v.getType() == ConfVars.VarType.LONG) {
this.setProperty(v.getVarName(), v.getLongValue());
Expand Down Expand Up @@ -184,12 +189,18 @@ public static synchronized ZeppelinConfiguration create() {
return conf;
}

public Map<String, String> getProperties() {
return this.properties;
}

public static void reset() {
conf = null;
}

public void setProperty(String name, String value) {
this.properties.put(name, value);
if (StringUtils.isNotBlank(name) && StringUtils.isNotBlank(value)) {
this.properties.put(name, value);
}
}

private String getStringValue(String name, String d) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.thrift.TException;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportException;
import org.apache.zeppelin.cluster.ClusterManagerClient;
import org.apache.zeppelin.cluster.meta.ClusterMeta;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
Expand Down Expand Up @@ -145,9 +144,9 @@ public class RemoteInterpreterServer extends Thread
private ScheduledExecutorService resultCleanService = Executors.newSingleThreadScheduledExecutor();

private boolean isTest;


private ZeppelinConfiguration zConf;
// cluster manager client
private ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
private ClusterManagerClient clusterManagerClient;

public RemoteInterpreterServer(String intpEventServerHost,
Expand Down Expand Up @@ -192,12 +191,6 @@ public RemoteInterpreterServer(String intpEventServerHost,
new TThreadPoolServer.Args(serverTransport).processor(processor));
remoteWorksResponsePool = Collections.synchronizedMap(new HashMap<String, Object>());

if (zConf.isClusterMode()) {
clusterManagerClient = ClusterManagerClient.getInstance(zConf);
clusterManagerClient.start(interpreterGroupId);
}

lifecycleManager = createLifecycleManager();
}

@Override
Expand All @@ -215,28 +208,18 @@ public void run() {
interrupted = true;
}
}

if (zConf.isClusterMode()) {
// Cluster mode, discovering interpreter processes through metadata registration
// TODO (Xun): Unified use of cluster metadata for process discovery of all operating modes
// 1. Can optimize the startup logic of the process
// 2. Can solve the problem that running the interpreter's IP in docker may be a virtual IP
putClusterMeta();
} else {
if (!interrupted) {
RegisterInfo registerInfo = new RegisterInfo(host, port, interpreterGroupId);
if (!interrupted) {
RegisterInfo registerInfo = new RegisterInfo(host, port, interpreterGroupId);
try {
LOGGER.info("Registering interpreter process");
intpEventClient.registerInterpreterProcess(registerInfo);
LOGGER.info("Registered interpreter process");
} catch (Exception e) {
LOGGER.error("Error while registering interpreter: {}, cause: {}", registerInfo, e);
try {
LOGGER.info("Registering interpreter process");
intpEventClient.registerInterpreterProcess(registerInfo);
LOGGER.info("Registered interpreter process");
lifecycleManager.onInterpreterProcessStarted(interpreterGroupId);
} catch (Exception e) {
LOGGER.error("Error while registering interpreter: {}", registerInfo, e);
try {
shutdown();
} catch (TException e1) {
LOGGER.warn("Exception occurs while shutting down", e1);
}
shutdown();
} catch (TException e1) {
LOGGER.warn("Exception occurs while shutting down", e1);
}
}
}
Expand Down Expand Up @@ -266,6 +249,32 @@ public void run() {
server.serve();
}

@Override
public void init(Map<String, String> properties) throws TException {
this.zConf = ZeppelinConfiguration.create();
for (Map.Entry<String, String> entry : properties.entrySet()) {
this.zConf.setProperty(entry.getKey(), entry.getValue());
}

if (zConf.isClusterMode()) {
clusterManagerClient = ClusterManagerClient.getInstance(zConf);
clusterManagerClient.start(interpreterGroupId);

// Cluster mode, discovering interpreter processes through metadata registration
// TODO (Xun): Unified use of cluster metadata for process discovery of all operating modes
// 1. Can optimize the startup logic of the process
// 2. Can solve the problem that running the interpreter's IP in docker may be a virtual IP
putClusterMeta();
}

try {
lifecycleManager = createLifecycleManager();
lifecycleManager.onInterpreterProcessStarted(interpreterGroupId);
} catch (Exception e) {
throw new TException("Fail to create LifeCycleManager", e);
}
}

@Override
public void shutdown() throws TException {

Expand Down Expand Up @@ -339,6 +348,14 @@ public void shutdown() throws TException {
shutDownThread.start();
}

public ZeppelinConfiguration getConf() {
return this.zConf;
}

public LifecycleManager getLifecycleManager() {
return this.lifecycleManager;
}

public int getPort() {
return port;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
package org.apache.zeppelin.interpreter.thrift;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-09-14")
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-09-22")
public class AngularObjectId implements org.apache.thrift.TBase<AngularObjectId, AngularObjectId._Fields>, java.io.Serializable, Cloneable, Comparable<AngularObjectId> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AngularObjectId");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
package org.apache.zeppelin.interpreter.thrift;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-09-14")
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-09-22")
public class AppOutputAppendEvent implements org.apache.thrift.TBase<AppOutputAppendEvent, AppOutputAppendEvent._Fields>, java.io.Serializable, Cloneable, Comparable<AppOutputAppendEvent> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AppOutputAppendEvent");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
package org.apache.zeppelin.interpreter.thrift;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-09-14")
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-09-22")
public class AppOutputUpdateEvent implements org.apache.thrift.TBase<AppOutputUpdateEvent, AppOutputUpdateEvent._Fields>, java.io.Serializable, Cloneable, Comparable<AppOutputUpdateEvent> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AppOutputUpdateEvent");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
package org.apache.zeppelin.interpreter.thrift;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-09-14")
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-09-22")
public class AppStatusUpdateEvent implements org.apache.thrift.TBase<AppStatusUpdateEvent, AppStatusUpdateEvent._Fields>, java.io.Serializable, Cloneable, Comparable<AppStatusUpdateEvent> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AppStatusUpdateEvent");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
package org.apache.zeppelin.interpreter.thrift;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-09-14")
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-09-22")
public class InterpreterCompletion implements org.apache.thrift.TBase<InterpreterCompletion, InterpreterCompletion._Fields>, java.io.Serializable, Cloneable, Comparable<InterpreterCompletion> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("InterpreterCompletion");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
package org.apache.zeppelin.interpreter.thrift;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-09-14")
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-09-22")
public class OutputAppendEvent implements org.apache.thrift.TBase<OutputAppendEvent, OutputAppendEvent._Fields>, java.io.Serializable, Cloneable, Comparable<OutputAppendEvent> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("OutputAppendEvent");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
package org.apache.zeppelin.interpreter.thrift;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-09-14")
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-09-22")
public class OutputUpdateAllEvent implements org.apache.thrift.TBase<OutputUpdateAllEvent, OutputUpdateAllEvent._Fields>, java.io.Serializable, Cloneable, Comparable<OutputUpdateAllEvent> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("OutputUpdateAllEvent");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
package org.apache.zeppelin.interpreter.thrift;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-09-14")
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-09-22")
public class OutputUpdateEvent implements org.apache.thrift.TBase<OutputUpdateEvent, OutputUpdateEvent._Fields>, java.io.Serializable, Cloneable, Comparable<OutputUpdateEvent> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("OutputUpdateEvent");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
package org.apache.zeppelin.interpreter.thrift;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-09-14")
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-09-22")
public class ParagraphInfo implements org.apache.thrift.TBase<ParagraphInfo, ParagraphInfo._Fields>, java.io.Serializable, Cloneable, Comparable<ParagraphInfo> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ParagraphInfo");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
package org.apache.zeppelin.interpreter.thrift;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-09-14")
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-09-22")
public class RegisterInfo implements org.apache.thrift.TBase<RegisterInfo, RegisterInfo._Fields>, java.io.Serializable, Cloneable, Comparable<RegisterInfo> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RegisterInfo");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
package org.apache.zeppelin.interpreter.thrift;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-09-14")
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-09-22")
public class RemoteApplicationResult implements org.apache.thrift.TBase<RemoteApplicationResult, RemoteApplicationResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteApplicationResult> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteApplicationResult");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
package org.apache.zeppelin.interpreter.thrift;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-09-14")
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-09-22")
public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteInterpreterContext, RemoteInterpreterContext._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterContext> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
package org.apache.zeppelin.interpreter.thrift;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-09-14")
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-09-22")
public class RemoteInterpreterEvent implements org.apache.thrift.TBase<RemoteInterpreterEvent, RemoteInterpreterEvent._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterEvent> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
package org.apache.zeppelin.interpreter.thrift;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-09-14")
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-09-22")
public class RemoteInterpreterEventService {

public interface Iface {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
package org.apache.zeppelin.interpreter.thrift;


@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-09-14")
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-09-22")
public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
NO_OP(1),
ANGULAR_OBJECT_ADD(2),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
package org.apache.zeppelin.interpreter.thrift;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-09-14")
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-09-22")
public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteInterpreterResult, RemoteInterpreterResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterResult> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
package org.apache.zeppelin.interpreter.thrift;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-09-14")
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-09-22")
public class RemoteInterpreterResultMessage implements org.apache.thrift.TBase<RemoteInterpreterResultMessage, RemoteInterpreterResultMessage._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterResultMessage> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResultMessage");

Expand Down
Loading

0 comments on commit 438f0b2

Please sign in to comment.