Skip to content

Commit

Permalink
[CELEBORN-1493] Check admin privileges for http mutative requests
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

If authentication enabled, check admin privileges for http mutative requests.

Likes:

```
POST /api/v1/workers/exclude
POST /api/v1/workers/events
POST /api/v1/workers/exit
```

### Why are the changes needed?

For security requirement.

### Does this PR introduce _any_ user-facing change?
Yes, after this pr, if http authentication enabled, for all mutative http requests, it will check the admin privileges.

Before this PR, if an API is not defined and the method is `POST/PUT/DELETE/PATCH`, the response status code is `404`.
After this PR, if the admin privileges check failed, the response status code will be `403`.

### How was this patch tested?
UT.

Closes apache#2601 from turboFei/admin_check.

Authored-by: Fei Wang <fwang12@ebay.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
  • Loading branch information
turboFei authored and RexXiong committed Jul 12, 2024
1 parent bd3f823 commit 09d3a3b
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2271,6 +2271,17 @@ object CelebornConf extends Logging {
.stringConf
.createWithDefault(classOf[AnonymousAuthenticationProviderImpl].getName)

val MASTER_HTTP_AUTH_ADMINISTERS: ConfigEntry[Seq[String]] =
buildConf("celeborn.master.http.auth.administers")
.categories("master")
.version("0.6.0")
.doc("A comma-separated list of users who have admin privileges," +
s" Note, when ${MASTER_HTTP_AUTH_SUPPORTED_SCHEMES.key} is not set," +
s" everyone is treated as administrator.")
.stringConf
.toSequence
.createWithDefault(Seq.empty)

val HA_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.master.ha.enabled")
.withAlternative("celeborn.ha.enabled")
Expand Down Expand Up @@ -2968,6 +2979,17 @@ object CelebornConf extends Logging {
.stringConf
.createWithDefault(classOf[AnonymousAuthenticationProviderImpl].getName)

val WORKER_HTTP_AUTH_ADMINISTERS: ConfigEntry[Seq[String]] =
buildConf("celeborn.worker.http.auth.administers")
.categories("worker")
.version("0.6.0")
.doc("A comma-separated list of users who have admin privileges," +
s" Note, when ${WORKER_HTTP_AUTH_SUPPORTED_SCHEMES.key} is not set," +
s" everyone is treated as administrator.")
.stringConf
.toSequence
.createWithDefault(Seq.empty)

val WORKER_RPC_PORT: ConfigEntry[Int] =
buildConf("celeborn.worker.rpc.port")
.categories("worker")
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/master.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ license: |
| celeborn.master.heartbeat.application.timeout | 300s | false | Application heartbeat timeout. | 0.3.0 | celeborn.application.heartbeat.timeout |
| celeborn.master.heartbeat.worker.timeout | 120s | false | Worker heartbeat timeout. | 0.3.0 | celeborn.worker.heartbeat.timeout |
| celeborn.master.host | &lt;localhost&gt; | false | Hostname for master to bind. | 0.2.0 | |
| celeborn.master.http.auth.administers | | false | A comma-separated list of users who have admin privileges, Note, when celeborn.master.http.auth.supportedSchemes is not set, everyone is treated as administrator. | 0.6.0 | |
| celeborn.master.http.auth.basic.provider | org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl | false | User-defined password authentication implementation of org.apache.celeborn.common.authentication.PasswdAuthenticationProvider | 0.6.0 | |
| celeborn.master.http.auth.bearer.provider | org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl | false | User-defined token authentication implementation of org.apache.celeborn.common.authentication.TokenAuthenticationProvider | 0.6.0 | |
| celeborn.master.http.auth.supportedSchemes | | false | A comma-separated list of master http auth supported schemes.<ul> <li>SPNEGO: Kerberos/GSSAPI authentication.</li> <li>BASIC: User-defined password authentication, the concreted implementation is configurable via `celeborn.master.http.auth.basic.provider`.</li> <li>BEARER: User-defined bearer token authentication, the concreted implementation is configurable via `celeborn.master.http.auth.bearer.provider`.</li></ul> | 0.6.0 | |
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ license: |
| celeborn.worker.graceful.shutdown.saveCommittedFileInfo.interval | 5s | false | Interval for a Celeborn worker to flush committed file infos into Level DB. | 0.3.1 | |
| celeborn.worker.graceful.shutdown.saveCommittedFileInfo.sync | false | false | Whether to call sync method to save committed file infos into Level DB to handle OS crash. | 0.3.1 | |
| celeborn.worker.graceful.shutdown.timeout | 600s | false | The worker's graceful shutdown timeout time. | 0.2.0 | |
| celeborn.worker.http.auth.administers | | false | A comma-separated list of users who have admin privileges, Note, when celeborn.worker.http.auth.supportedSchemes is not set, everyone is treated as administrator. | 0.6.0 | |
| celeborn.worker.http.auth.basic.provider | org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl | false | User-defined password authentication implementation of org.apache.celeborn.common.authentication.PasswdAuthenticationProvider | 0.6.0 | |
| celeborn.worker.http.auth.bearer.provider | org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl | false | User-defined token authentication implementation of org.apache.celeborn.common.authentication.TokenAuthenticationProvider | 0.6.0 | |
| celeborn.worker.http.auth.supportedSchemes | | false | A comma-separated list of worker http auth supported schemes.<ul> <li>SPNEGO: Kerberos/GSSAPI authentication.</li> <li>BASIC: User-defined password authentication, the concreted implementation is configurable via `celeborn.worker.http.auth.basic.provider`.</li> <li>BEARER: User-defined bearer token authentication, the concreted implementation is configurable via `celeborn.worker.http.auth.bearer.provider`.</li></ul> | 0.6.0 | |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.IOException
import javax.security.sasl.AuthenticationException
import javax.servlet.{Filter, FilterChain, FilterConfig, ServletException, ServletRequest, ServletResponse}
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
import javax.ws.rs.HttpMethod

import scala.collection.mutable

Expand Down Expand Up @@ -64,6 +65,13 @@ class AuthenticationFilter(conf: CelebornConf, serviceName: String) extends Filt
case Service.WORKER => conf.get(CelebornConf.WORKER_HTTP_PROXY_CLIENT_IP_HEADER)
}

private val administrators: Set[String] = serviceName match {
case Service.MASTER =>
conf.get(CelebornConf.MASTER_HTTP_AUTH_ADMINISTERS).toSet
case Service.WORKER =>
conf.get(CelebornConf.WORKER_HTTP_AUTH_ADMINISTERS).toSet
}

private def initAuthHandlers(): Unit = {
if (authSchemes.contains(HttpAuthSchemes.NEGOTIATE)) {
serviceName match {
Expand Down Expand Up @@ -160,8 +168,15 @@ class AuthenticationFilter(conf: CelebornConf, serviceName: String) extends Filt
}
}

private def isMutativeRequest(httpRequest: HttpServletRequest): Boolean = {
HttpMethod.POST.equalsIgnoreCase(httpRequest.getMethod) ||
HttpMethod.PUT.equalsIgnoreCase(httpRequest.getMethod) ||
HttpMethod.DELETE.equalsIgnoreCase(httpRequest.getMethod) ||
HttpMethod.PATCH.equalsIgnoreCase(httpRequest.getMethod)
}

/**
* Delegates call to the servlet filter chain. Sub-classes my override this
* Delegates call to the servlet filter chain. Sub-classes may override this
* method to perform pre and post tasks.
*
* @param filterChain the filter chain object.
Expand All @@ -176,7 +191,14 @@ class AuthenticationFilter(conf: CelebornConf, serviceName: String) extends Filt
filterChain: FilterChain,
request: HttpServletRequest,
response: HttpServletResponse): Unit = {
filterChain.doFilter(request, response)
if (isMutativeRequest(request) && !administrators.contains(HTTP_CLIENT_IDENTIFIER.get())) {
response.setStatus(HttpServletResponse.SC_FORBIDDEN)
response.sendError(
HttpServletResponse.SC_FORBIDDEN,
s"${HTTP_CLIENT_IDENTIFIER.get()} does not have admin privilege to perform ${request.getMethod} action")
} else {
filterChain.doFilter(request, response)
}
}

override def destroy(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.celeborn.server.common.http.HttpAuthUtils.AUTHORIZATION_HEADER
import org.apache.celeborn.server.common.http.authentication.{UserDefinePasswordAuthenticationProviderImpl, UserDefineTokenAuthenticationProviderImpl}

abstract class ApiBaseResourceAuthenticationSuite extends HttpTestHelper {
val administers = Seq("celeborn", "celeborn2")
celebornConf
.set(CelebornConf.METRICS_ENABLED.key, "true")
.set(
Expand All @@ -48,6 +49,8 @@ abstract class ApiBaseResourceAuthenticationSuite extends HttpTestHelper {
.set(
CelebornConf.WORKER_HTTP_AUTH_BEARER_PROVIDER,
classOf[UserDefineTokenAuthenticationProviderImpl].getName)
.set(CelebornConf.MASTER_HTTP_AUTH_ADMINISTERS, administers)
.set(CelebornConf.WORKER_HTTP_AUTH_ADMINISTERS, administers)

def basicAuthorizationHeader(user: String, password: String): String =
HttpAuthSchemes.BASIC + " " + new String(
Expand Down Expand Up @@ -117,4 +120,31 @@ abstract class ApiBaseResourceAuthenticationSuite extends HttpTestHelper {
assert(200 == response.getStatus)
assert(response.readEntity(classOf[String]).contains("\"name\" : \"jvm.memory.heap.max\""))
}

test("check admin privilege for mutative request") {
Seq("/any_api", "/api/v1/any_api").foreach { api =>
var response = webTarget.path(api)
.request(MediaType.TEXT_PLAIN)
.header(
AUTHORIZATION_HEADER,
basicAuthorizationHeader(
"no_admin",
UserDefinePasswordAuthenticationProviderImpl.VALID_PASSWORD))
.post(null)
assert(HttpServletResponse.SC_FORBIDDEN == response.getStatus)

administers.foreach { admin =>
response = webTarget.path(api)
.request(MediaType.TEXT_PLAIN)
.header(
AUTHORIZATION_HEADER,
basicAuthorizationHeader(
admin,
UserDefinePasswordAuthenticationProviderImpl.VALID_PASSWORD))
.post(null)
// pass the admin privilege check, but the api is not found
assert(HttpServletResponse.SC_NOT_FOUND == response.getStatus)
}
}
}
}

0 comments on commit 09d3a3b

Please sign in to comment.