Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge EventMesh Function branch to master #5019

Merged
merged 27 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
8481187
EventMesh function admin (#4851)
sodaRyCN Apr 18, 2024
02bcd35
EventMesh function admin (#4853)
sodaRyCN Apr 19, 2024
02f6d44
Eventmesh function admin (#4854)
sodaRyCN Apr 22, 2024
859ad8d
EventMesh function connector runtime (#4858)
xwm1992 Apr 22, 2024
78942c4
[ISSUE #4931]Add Registry Module for Discovery AdminServer
sodaRyCN May 29, 2024
aa8f604
[ISSUES #4933]Add Admin Module
sodaRyCN May 29, 2024
1cb3b02
[ISSUE #4935] Add and Move the Pojo Used By Both Runtime and Admin to…
sodaRyCN May 29, 2024
cb1b7b8
[ISSUE #4937]fix gradle dependecy and add runtime v2
sodaRyCN May 29, 2024
b4d3b2a
[ISSUES #4939]add canal connector
sodaRyCN May 29, 2024
c673a19
fix conflicts with master
xwm1992 May 29, 2024
a224b51
fix missing apache header
xwm1992 May 29, 2024
864cda0
fix missing apache header
xwm1992 May 29, 2024
ae3cbfc
fix missing apache header
xwm1992 May 29, 2024
6ab26df
update gradle dependencies
xwm1992 May 29, 2024
0a5686e
fix admin server ci check error
xwm1992 May 29, 2024
9934aa1
fix admin server ci check error
xwm1992 May 29, 2024
715c595
fix ci checkStyle error
xwm1992 May 30, 2024
0b9f6d0
Merge branch 'master' of https://github.com/apache/eventmesh into eve…
xwm1992 May 30, 2024
bfafbfb
fix ci check error
xwm1992 May 31, 2024
3550ac3
[ISSUE #4979]Canal Connector supports bidirectional data synchronization
xwm1992 Jun 5, 2024
5f1c1af
add bash files for admin & runtime-v2
xwm1992 Jun 6, 2024
8f4748f
fix ack offset read & persist
xwm1992 Jun 28, 2024
50a204e
fix checkStyle error
xwm1992 Jun 28, 2024
ae621d4
[ISSUE #4979] Canal Connector supports bidirectional data synchroniza…
xwm1992 Jul 1, 2024
fb411ba
Merge branch 'eventmesh-function' of https://github.com/apache/eventm…
xwm1992 Jul 1, 2024
f2d7645
solve conflicts with master branch
xwm1992 Jul 1, 2024
4e638f5
fix http source connector test error
xwm1992 Jul 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[ISSUE #4979] Canal Connector supports bidirectional data synchroniza…
…tion (#5011)

* [ISSUE #4979]Canal Connector supports bidirectional data synchronization

* add bash files for admin & runtime-v2

* fix ack offset read & persist

* fix checkStyle error
  • Loading branch information
xwm1992 authored Jul 1, 2024
commit ae621d49082cbe7506c72ad1f553efbf3bc96006
7 changes: 6 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,13 @@ tasks.register('dist') {
["eventmesh-common",
"eventmesh-meta:eventmesh-meta-api",
"eventmesh-metrics-plugin:eventmesh-metrics-api",
"eventmesh-openconnect:eventmesh-openconnect-java",
"eventmesh-openconnect:eventmesh-openconnect-offsetmgmt-plugin:eventmesh-openconnect-offsetmgmt-api",
"eventmesh-protocol-plugin:eventmesh-protocol-api",
"eventmesh-registry:eventmesh-registry-api",
"eventmesh-retry:eventmesh-retry-api",
"eventmesh-runtime",
"eventmesh-runtime-v2",
"eventmesh-security-plugin:eventmesh-security-api",
"eventmesh-spi",
"eventmesh-starter",
Expand Down Expand Up @@ -660,6 +664,7 @@ subprojects {

dependencyManagement {
dependencies {

dependency "org.apache.commons:commons-lang3:3.6"
dependency "org.apache.commons:commons-collections4:4.4"
dependency "org.apache.commons:commons-text:1.9"
Expand Down Expand Up @@ -709,7 +714,7 @@ subprojects {
dependency "com.mebigfatguy.fb-contrib:fb-contrib:7.6.0"
dependency "com.jayway.jsonpath:json-path:2.9.0"

dependency "org.springframework.boot:spring-boot-starter-web:2.7.18"
dependency "org.springframework.boot:spring-boot-starter-web:2.5.4"
dependency "io.openmessaging:registry-server:0.0.1"

dependency "org.junit.jupiter:junit-jupiter:5.6.0"
Expand Down
201 changes: 201 additions & 0 deletions eventmesh-admin-server/bin/start-admin.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
#!/bin/bash
#
# Licensed to Apache Software Foundation (ASF) under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Apache Software Foundation (ASF) licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

#===========================================================================================
# Java Environment Setting
#===========================================================================================
set -e
# Server configuration may be inconsistent, add these configurations to avoid garbled code problems
export LANG=en_US.UTF-8
export LC_CTYPE=en_US.UTF-8
export LC_ALL=en_US.UTF-8

TMP_JAVA_HOME="/customize/your/java/home/here"

# Detect operating system.
OS=$(uname)

function is_java8_or_11 {
local _java="$1"
[[ -x "$_java" ]] || return 1
[[ "$("$_java" -version 2>&1)" =~ 'java version "1.8' || "$("$_java" -version 2>&1)" =~ 'openjdk version "1.8' || "$("$_java" -version 2>&1)" =~ 'java version "11' || "$("$_java" -version 2>&1)" =~ 'openjdk version "11' ]] || return 2
return 0
}

function extract_java_version {
local _java="$1"
local version=$("$_java" -version 2>&1 | awk -F '"' '/version/ {print $2}' | awk -F '.' '{if ($1 == 1 && $2 == 8) print "8"; else if ($1 == 11) print "11"; else print "unknown"}')
echo "$version"
}

# 0(not running), 1(is running)
#function is_proxyRunning {
# local _pid="$1"
# local pid=`ps ax | grep -i 'org.apache.eventmesh.runtime.boot.EventMeshStartup' |grep java | grep -v grep | awk '{print $1}'|grep $_pid`
# if [ -z "$pid" ] ; then
# return 0
# else
# return 1
# fi
#}

function get_pid {
local ppid=""
if [ -f ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file ]; then
ppid=$(cat ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file)
# If the process does not exist, it indicates that the previous process terminated abnormally.
if [ ! -d /proc/$ppid ]; then
# Remove the residual file.
rm ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file
echo -e "ERROR\t EventMesh process had already terminated unexpectedly before, please check log output."
ppid=""
fi
else
if [[ $OS =~ Msys ]]; then
# There is a Bug on Msys that may not be able to kill the identified process
ppid=`jps -v | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep java | grep -v grep | awk -F ' ' {'print $1'}`
elif [[ $OS =~ Darwin ]]; then
# Known problem: grep Java may not be able to accurately identify Java processes
ppid=$(/bin/ps -o user,pid,command | grep "java" | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" |awk -F ' ' {'print $2'})
else
if [ $DOCKER ]; then
# No need to exclude root user in Docker containers.
ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_ADMIN_HOME | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | awk -F ' ' {'print $2'})
else
# It is required to identify the process as accurately as possible on Linux.
ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_ADMIN_HOME | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" | awk -F ' ' {'print $2'})
fi
fi
fi
echo "$ppid";
}

#===========================================================================================
# Locate Java Executable
#===========================================================================================

if [[ -d "$TMP_JAVA_HOME" ]] && is_java8_or_11 "$TMP_JAVA_HOME/bin/java"; then
JAVA="$TMP_JAVA_HOME/bin/java"
JAVA_VERSION=$(extract_java_version "$TMP_JAVA_HOME/bin/java")
elif [[ -d "$JAVA_HOME" ]] && is_java8_or_11 "$JAVA_HOME/bin/java"; then
JAVA="$JAVA_HOME/bin/java"
JAVA_VERSION=$(extract_java_version "$JAVA_HOME/bin/java")
elif is_java8_or_11 "$(which java)"; then
JAVA="$(which java)"
JAVA_VERSION=$(extract_java_version "$(which java)")
else
echo -e "ERROR\t Java 8 or 11 not found, operation abort."
exit 9;
fi

echo "EventMesh using Java version: $JAVA_VERSION, path: $JAVA"

EVENTMESH_ADMIN_HOME=$(cd "$(dirname "$0")/.." && pwd)
export EVENTMESH_ADMIN_HOME

EVENTMESH_ADMIN_LOG_HOME="${EVENTMESH_ADMIN_HOME}/logs"
export EVENTMESH_ADMIN_LOG_HOME

echo -e "EVENTMESH_ADMIN_HOME : ${EVENTMESH_ADMIN_HOME}\nEVENTMESH_ADMIN_LOG_HOME : ${EVENTMESH_ADMIN_LOG_HOME}"

function make_logs_dir {
if [ ! -e "${EVENTMESH_ADMIN_LOG_HOME}" ]; then mkdir -p "${EVENTMESH_ADMIN_LOG_HOME}"; fi
}

error_exit ()
{
echo -e "ERROR\t $1 !!"
exit 1
}

export JAVA_HOME

#===========================================================================================
# JVM Configuration
#===========================================================================================
#if [ $1 = "prd" -o $1 = "benchmark" ]; then JAVA_OPT="${JAVA_OPT} -server -Xms2048M -Xmx4096M -Xmn2048m -XX:SurvivorRatio=4"
#elif [ $1 = "sit" ]; then JAVA_OPT="${JAVA_OPT} -server -Xms256M -Xmx512M -Xmn256m -XX:SurvivorRatio=4"
#elif [ $1 = "dev" ]; then JAVA_OPT="${JAVA_OPT} -server -Xms128M -Xmx256M -Xmn128m -XX:SurvivorRatio=4"
#fi

GC_LOG_FILE="${EVENTMESH_ADMIN_LOG_HOME}/eventmesh_admin_gc_%p.log"

#JAVA_OPT="${JAVA_OPT} -server -Xms2048M -Xmx4096M -Xmn2048m -XX:SurvivorRatio=4"
JAVA_OPT=`cat ${EVENTMESH_ADMIN_HOME}/conf/server.env | grep APP_START_JVM_OPTION::: | awk -F ':::' {'print $2'}`
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8 -XX:MaxGCPauseMillis=50"
JAVA_OPT="${JAVA_OPT} -verbose:gc"
if [[ "$JAVA_VERSION" == "8" ]]; then
# Set JAVA_OPT for Java 8
JAVA_OPT="${JAVA_OPT} -Xloggc:${GC_LOG_FILE} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
JAVA_OPT="${JAVA_OPT} -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy"
elif [[ "$JAVA_VERSION" == "11" ]]; then
# Set JAVA_OPT for Java 11
XLOG_PARAM="time,level,tags:filecount=5,filesize=30m"
JAVA_OPT="${JAVA_OPT} -Xlog:gc*:${GC_LOG_FILE}:${XLOG_PARAM}"
JAVA_OPT="${JAVA_OPT} -Xlog:safepoint:${GC_LOG_FILE}:${XLOG_PARAM} -Xlog:ergo*=debug:${GC_LOG_FILE}:${XLOG_PARAM}"
fi
JAVA_OPT="${JAVA_OPT} -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${EVENTMESH_ADMIN_LOG_HOME} -XX:ErrorFile=${EVENTMESH_ADMIN_LOG_HOME}/hs_err_%p.log"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch"
JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=8G"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking"
JAVA_OPT="${JAVA_OPT} -Dio.netty.leakDetectionLevel=advanced"
JAVA_OPT="${JAVA_OPT} -Dio.netty.allocator.type=pooled"
JAVA_OPT="${JAVA_OPT} -Djava.security.egd=file:/dev/./urandom"
JAVA_OPT="${JAVA_OPT} -Dlog4j.configurationFile=${EVENTMESH_ADMIN_HOME}/conf/log4j2.xml"
JAVA_OPT="${JAVA_OPT} -Deventmesh.log.home=${EVENTMESH_ADMIN_LOG_HOME}"
JAVA_OPT="${JAVA_OPT} -DconfPath=${EVENTMESH_ADMIN_HOME}/conf"
JAVA_OPT="${JAVA_OPT} -DconfigurationPath=${EVENTMESH_ADMIN_HOME}/conf"
JAVA_OPT="${JAVA_OPT} -Dlog4j2.AsyncQueueFullPolicy=Discard"
JAVA_OPT="${JAVA_OPT} -Drocketmq.client.logUseSlf4j=true"
JAVA_OPT="${JAVA_OPT} -DeventMeshPluginDir=${EVENTMESH_ADMIN_HOME}/plugin"

#if [ -f "pid.file" ]; then
# pid=`cat pid.file`
# if ! is_proxyRunning "$pid"; then
# echo "proxy is running already"
# exit 9;
# else
# echo "err pid$pid, rm pid.file"
# rm pid.file
# fi
#fi

pid=$(get_pid)
if [[ $pid == "ERROR"* ]]; then
echo -e "${pid}"
exit 9
fi
if [ -n "$pid" ]; then
echo -e "ERROR\t The server is already running (pid=$pid), there is no need to execute start.sh again."
exit 9
fi

make_logs_dir

echo "Using Java version: $JAVA_VERSION, path: $JAVA" >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out

EVENTMESH_ADMIN_MAIN=org.apache.eventmesh.admin.server.ExampleAdminServer
if [ $DOCKER ]; then
$JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out
else
$JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out 2>&1 &
echo $!>${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file
fi
exit 0
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.eventmesh.common.remote.job.JobTransportType;
import org.apache.eventmesh.common.remote.offset.RecordPosition;

import java.util.List;
import java.util.Map;

import lombok.Data;
Expand All @@ -42,7 +43,7 @@ public class EventMeshJobDetail {

private String sinkConnectorDesc;

private RecordPosition position;
private List<RecordPosition> position;

private JobState state;
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.eventmesh.common.remote.request.FetchPositionRequest;
import org.apache.eventmesh.common.remote.request.ReportPositionRequest;

import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

Expand All @@ -38,7 +40,7 @@ public class EventMeshPositionBizService {
PositionHandlerFactory factory;

// called isValidateReportRequest before call this
public RecordPosition getPosition(FetchPositionRequest request, Metadata metadata) {
public List<RecordPosition> getPosition(FetchPositionRequest request, Metadata metadata) {
if (request == null) {
return null;
}
Expand Down Expand Up @@ -68,7 +70,7 @@ public boolean reportPosition(ReportPositionRequest request, Metadata metadata)
return handler.handler(request, metadata);
}

public RecordPosition getPositionByJobID(Integer jobID, DataSourceType type) {
public List<RecordPosition> getPositionByJobID(Integer jobID, DataSourceType type) {
if (jobID == null || type == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import org.apache.eventmesh.common.remote.offset.RecordPosition;
import org.apache.eventmesh.common.remote.request.FetchPositionRequest;

import java.util.List;

/**
* IFetchPositionHandler
*/
public interface IFetchPositionHandler {

RecordPosition handler(FetchPositionRequest request, Metadata metadata);
List<RecordPosition> handler(FetchPositionRequest request, Metadata metadata);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.eventmesh.common.remote.request.ReportPositionRequest;
import org.apache.eventmesh.common.utils.JsonUtils;

import java.util.ArrayList;
import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -142,20 +143,21 @@ public boolean handler(ReportPositionRequest request, Metadata metadata) {
}

@Override
public RecordPosition handler(FetchPositionRequest request, Metadata metadata) {
EventMeshMysqlPosition position = positionService.getOne(Wrappers.<EventMeshMysqlPosition>query().eq("jobID",
public List<RecordPosition> handler(FetchPositionRequest request, Metadata metadata) {
List<EventMeshMysqlPosition> positionList = positionService.list(Wrappers.<EventMeshMysqlPosition>query().eq("jobID",
request.getJobID()));
RecordPosition recordPosition = null;
if (position != null) {
List<RecordPosition> recordPositionList = new ArrayList<>();
for (EventMeshMysqlPosition position : positionList) {
RecordPosition recordPosition = new RecordPosition();
CanalRecordPartition partition = new CanalRecordPartition();
partition.setTimeStamp(position.getTimestamp());
partition.setJournalName(position.getJournalName());
recordPosition.setRecordPartition(partition);
CanalRecordOffset offset = new CanalRecordOffset();
offset.setOffset(position.getPosition());
recordPosition = new RecordPosition();
recordPosition.setRecordPartition(partition);
recordPosition.setRecordOffset(offset);
recordPositionList.add(recordPosition);
}
return recordPosition;
return recordPositionList;
}
}
4 changes: 2 additions & 2 deletions eventmesh-admin-server/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
spring:
datasource:
url: jdbc:mysql://localhost:3306/eventmesh?serverTimezone=GMT%2B8&characterEncoding=utf-8&useSSL=false&allowPublicKeyRetrieval=true
username: root
password: mike920830
username: //db_username
password: //db_password
driver-class-name: com.mysql.cj.jdbc.Driver
mybatis-plus:
mapper-locations: classpath:mapper/*.xml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,20 @@
@EqualsAndHashCode(callSuper = true)
public class CanalSinkConfig extends SinkConfig {

private Integer batchSize = 50; // batchSize
// batchSize
private Integer batchSize = 50;

private Boolean useBatch = true; // enable batch
// enable batch
private Boolean useBatch = true;

private Integer poolSize = 5; // sink thread size for single channel
// sink thread size for single channel
private Integer poolSize = 5;

private SyncMode syncMode; // sync mode: field/row
// sync mode: field/row
private SyncMode syncMode;

private Boolean skipException = false; // skip sink process exception
// skip sink process exception
private Boolean skipException = false;

public SinkConnectorConfig sinkConnectorConfig;

Expand Down
Loading
Loading