Skip to content

Commit

Permalink
Revert "Support runscript callbackHeader (#1787)" (#1789)
Browse files Browse the repository at this point in the history
This reverts commit e56aaaf.
  • Loading branch information
allwefantasy authored Jun 8, 2022
1 parent e56aaaf commit 445b081
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import net.csdn.common.path.Url
import net.csdn.modules.transport.HttpTransportService.SResponse
import net.csdn.modules.transport.{DefaultHttpTransportService, HttpTransportService}
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.http.{HttpEntity, HttpResponse}
import org.apache.http.client.entity.UrlEncodedFormEntity
import org.apache.http.client.fluent.{Form, Request}
import org.apache.http.entity.ContentType
import org.apache.http.entity.mime.{HttpMultipartMode, MultipartEntityBuilder}
import org.apache.http.message.BasicNameValuePair
import org.apache.http.util.EntityUtils
import org.apache.http.{HttpEntity, HttpResponse}
import streaming.dsl.ScriptSQLExec
import streaming.log.WowLog
import tech.mlsql.common.JsonUtils
Expand All @@ -22,21 +22,17 @@ import tech.mlsql.tool.{HDFSOperatorV2, Templates2}
import java.nio.charset.Charset
import scala.annotation.tailrec
import scala.collection.JavaConversions._
import scala.util.control.Breaks.{break, breakable}

object RestUtils extends Logging with WowLog {
def httpClientPost(urlString: String, data: Map[String, String], headers: Map[String, String]): HttpResponse = {
def httpClientPost(urlString: String, data: Map[String, String]): HttpResponse = {
val nameValuePairs = data.map { case (name, value) =>
new BasicNameValuePair(name, value)
}.toList

val req = Request.Post(urlString)
Request.Post(urlString)
.addHeader("Content-Type", "application/x-www-form-urlencoded")

headers foreach { case (name, value) =>
req.setHeader(name, value)
}

req.body(new UrlEncodedFormEntity(nameValuePairs, DefaultHttpTransportService.charset))
.body(new UrlEncodedFormEntity(nameValuePairs, DefaultHttpTransportService.charset))
.execute()
.returnResponse()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import org.apache.spark.sql.mlsql.session.{MLSQLSparkSession, SparkSessionCacheM
import org.apache.spark.{MLSQLConf, SparkInstanceService}
import tech.mlsql.MLSQLEnvKey
import tech.mlsql.app.{CustomController, ResultResp}
import tech.mlsql.common.JsonUtils
import tech.mlsql.common.utils.log.Logging
import tech.mlsql.common.utils.serder.json.JSONTool
import tech.mlsql.crawler.RestUtils
Expand Down Expand Up @@ -106,7 +105,6 @@ class RestController extends ApplicationController with WowLog with Logging {
new Parameter(name = "sessionPerRequest", required = false, description = "by default false", `type` = "boolean", allowEmptyValue = false),
new Parameter(name = "async", required = false, description = "If set true ,please also provide a callback url use `callback` parameter and the job will run in background and the API will return. default: false", `type` = "boolean", allowEmptyValue = false),
new Parameter(name = "callback", required = false, description = "Used when async is set true. callback is a url. default: false", `type` = "string", allowEmptyValue = false),
new Parameter(name = "callbackHeader", required = false, description = "Provide a jsonString parameter to set the header parameter of the callback request. default: false", `type` = "string", allowEmptyValue = false),
new Parameter(name = "maxRetries", required = false, description = "Max retries of request callback.", `type` = "int", allowEmptyValue = false),
new Parameter(name = "skipInclude", required = false, description = "disable include statement. default: false", `type` = "boolean", allowEmptyValue = false),
new Parameter(name = "skipAuth", required = false, description = "disable table authorize . default: true", `type` = "boolean", allowEmptyValue = false),
Expand Down Expand Up @@ -149,12 +147,6 @@ class RestController extends ApplicationController with WowLog with Logging {
if (paramAsBoolean("async", false)) {
JobManager.asyncRun(sparkSession, jobInfo, () => {
val urlString = param("callback")
val callbackHeaderString = param("callbackHeader")
var callbackHeader = Map[String,String]()
if (callbackHeaderString != null && callbackHeaderString.nonEmpty){
callbackHeader = JsonUtils.fromJson[Map[String,String]](callbackHeaderString)
}

val maxTries = Math.max(0, paramAsInt("maxRetries", -1)) + 1
try {
ScriptSQLExec.parse(param("sql"), context,
Expand All @@ -166,7 +158,7 @@ class RestController extends ApplicationController with WowLog with Logging {
outputResult = getScriptResult(context, sparkSession)

executeWithRetrying[HttpResponse](maxTries)(
RestUtils.httpClientPost(urlString, callbackHeader,
RestUtils.httpClientPost(urlString,
Map("stat" -> s"""succeeded""",
"res" -> outputResult,
"jobInfo" -> JSONTool.toJsonStr(jobInfo))),
Expand All @@ -183,7 +175,7 @@ class RestController extends ApplicationController with WowLog with Logging {
}

executeWithRetrying[HttpResponse](maxTries)(
RestUtils.httpClientPost(urlString, Map(),
RestUtils.httpClientPost(urlString,
Map("stat" -> s"""failed""",
"msg" -> (e.getMessage + "\n" + msgBuffer.mkString("\n")),
"jobInfo" -> JSONTool.toJsonStr(jobInfo))),
Expand Down

0 comments on commit 445b081

Please sign in to comment.