forked from apache/apisix
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: provide skywalking logger plugin (apache#5478)
Co-authored-by: 罗泽轩 <spacewanderlzx@gmail.com> Co-authored-by: 吴晟 Wu Sheng <wu.sheng@foxmail.com>
- Loading branch information
1 parent
0570d59
commit 5f6afdb
Showing
8 changed files
with
738 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,233 @@ | ||
-- | ||
-- Licensed to the Apache Software Foundation (ASF) under one or more | ||
-- contributor license agreements. See the NOTICE file distributed with | ||
-- this work for additional information regarding copyright ownership. | ||
-- The ASF licenses this file to You under the Apache License, Version 2.0 | ||
-- (the "License"); you may not use this file except in compliance with | ||
-- the License. You may obtain a copy of the License at | ||
-- | ||
-- http://www.apache.org/licenses/LICENSE-2.0 | ||
-- | ||
-- Unless required by applicable law or agreed to in writing, software | ||
-- distributed under the License is distributed on an "AS IS" BASIS, | ||
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
-- See the License for the specific language governing permissions and | ||
-- limitations under the License. | ||
-- | ||
|
||
local batch_processor = require("apisix.utils.batch-processor") | ||
local log_util = require("apisix.utils.log-util") | ||
local core = require("apisix.core") | ||
local http = require("resty.http") | ||
local url = require("net.url") | ||
local plugin = require("apisix.plugin") | ||
|
||
local base64 = require("ngx.base64") | ||
local ngx_re = require("ngx.re") | ||
|
||
local ngx = ngx | ||
local tostring = tostring | ||
local tonumber = tonumber | ||
local ipairs = ipairs | ||
local timer_at = ngx.timer.at | ||
|
||
local plugin_name = "skywalking-logger" | ||
local stale_timer_running = false | ||
local buffers = {} | ||
|
||
local schema = { | ||
type = "object", | ||
properties = { | ||
endpoint_addr = core.schema.uri_def, | ||
service_name = {type = "string", default = "APISIX"}, | ||
service_instance_name = {type = "string", default = "APISIX Instance Name"}, | ||
timeout = {type = "integer", minimum = 1, default = 3}, | ||
name = {type = "string", default = "skywalking logger"}, | ||
max_retry_count = {type = "integer", minimum = 0, default = 0}, | ||
retry_delay = {type = "integer", minimum = 0, default = 1}, | ||
buffer_duration = {type = "integer", minimum = 1, default = 60}, | ||
inactive_timeout = {type = "integer", minimum = 1, default = 5}, | ||
batch_max_size = {type = "integer", minimum = 1, default = 1000}, | ||
include_req_body = {type = "boolean", default = false}, | ||
}, | ||
required = {"endpoint_addr"}, | ||
} | ||
|
||
|
||
local metadata_schema = { | ||
type = "object", | ||
properties = { | ||
log_format = log_util.metadata_schema_log_format, | ||
}, | ||
} | ||
|
||
|
||
local _M = { | ||
version = 0.1, | ||
priority = 408, | ||
name = plugin_name, | ||
schema = schema, | ||
metadata_schema = metadata_schema, | ||
} | ||
|
||
|
||
function _M.check_schema(conf, schema_type) | ||
if schema_type == core.schema.TYPE_METADATA then | ||
return core.schema.check(metadata_schema, conf) | ||
end | ||
return core.schema.check(schema, conf) | ||
end | ||
|
||
|
||
local function send_http_data(conf, log_message) | ||
local err_msg | ||
local res = true | ||
local url_decoded = url.parse(conf.endpoint_addr) | ||
local host = url_decoded.host | ||
local port = url_decoded.port | ||
|
||
core.log.info("sending a batch logs to ", conf.endpoint_addr) | ||
|
||
local httpc = http.new() | ||
httpc:set_timeout(conf.timeout * 1000) | ||
local ok, err = httpc:connect(host, port) | ||
|
||
if not ok then | ||
return false, "failed to connect to host[" .. host .. "] port[" | ||
.. tostring(port) .. "] " .. err | ||
end | ||
|
||
local httpc_res, httpc_err = httpc:request({ | ||
method = "POST", | ||
path = "/v3/logs", | ||
body = log_message, | ||
headers = { | ||
["Host"] = url_decoded.host, | ||
["Content-Type"] = "application/json", | ||
} | ||
}) | ||
|
||
if not httpc_res then | ||
return false, "error while sending data to [" .. host .. "] port[" | ||
.. tostring(port) .. "] " .. httpc_err | ||
end | ||
|
||
-- some error occurred in the server | ||
if httpc_res.status >= 400 then | ||
res = false | ||
err_msg = "server returned status code[" .. httpc_res.status .. "] host[" | ||
.. host .. "] port[" .. tostring(port) .. "] " | ||
.. "body[" .. httpc_res:read_body() .. "]" | ||
end | ||
|
||
return res, err_msg | ||
end | ||
|
||
|
||
-- remove stale objects from the memory after timer expires | ||
local function remove_stale_objects(premature) | ||
if premature then | ||
return | ||
end | ||
|
||
for key, batch in ipairs(buffers) do | ||
if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then | ||
core.log.warn("removing batch processor stale object, conf: ", | ||
core.json.delay_encode(key)) | ||
buffers[key] = nil | ||
end | ||
end | ||
|
||
stale_timer_running = false | ||
end | ||
|
||
|
||
function _M.log(conf, ctx) | ||
local metadata = plugin.plugin_metadata(plugin_name) | ||
core.log.info("metadata: ", core.json.delay_encode(metadata)) | ||
|
||
local log_body | ||
if metadata and metadata.value.log_format | ||
and core.table.nkeys(metadata.value.log_format) > 0 | ||
then | ||
log_body = log_util.get_custom_format_log(ctx, metadata.value.log_format) | ||
else | ||
log_body = log_util.get_full_log(ngx, conf) | ||
end | ||
|
||
local trace_context | ||
local sw_header = ngx.req.get_headers()["sw8"] | ||
if sw_header then | ||
-- 1-TRACEID-SEGMENTID-SPANID-PARENT_SERVICE-PARENT_INSTANCE-PARENT_ENDPOINT-IPPORT | ||
local ids = ngx_re.split(sw_header, '-') | ||
if #ids == 8 then | ||
trace_context = { | ||
traceId = base64.decode_base64url(ids[2]), | ||
traceSegmentId = base64.decode_base64url(ids[3]), | ||
spanId = tonumber(ids[4]) | ||
} | ||
else | ||
core.log.warn("failed to parse trace_context header: ", sw_header) | ||
end | ||
end | ||
|
||
local entry = { | ||
traceContext = trace_context, | ||
body = { | ||
json = { | ||
json = core.json.encode(log_body, true) | ||
} | ||
}, | ||
service = conf.service_name, | ||
serviceInstance = conf.service_instance_name, | ||
endpoint = ctx.var.uri, | ||
} | ||
|
||
if not stale_timer_running then | ||
-- run the timer every 30 mins if any log is present | ||
timer_at(1800, remove_stale_objects) | ||
stale_timer_running = true | ||
end | ||
|
||
local log_buffer = buffers[conf] | ||
|
||
if log_buffer then | ||
log_buffer:push(entry) | ||
return | ||
end | ||
|
||
-- Generate a function to be executed by the batch processor | ||
local func = function(entries, batch_max_size) | ||
local data, err = core.json.encode(entries) | ||
if not data then | ||
return false, 'error occurred while encoding the data: ' .. err | ||
end | ||
|
||
return send_http_data(conf, data) | ||
end | ||
|
||
local config = { | ||
name = conf.name, | ||
retry_delay = conf.retry_delay, | ||
batch_max_size = conf.batch_max_size, | ||
max_retry_count = conf.max_retry_count, | ||
buffer_duration = conf.buffer_duration, | ||
inactive_timeout = conf.inactive_timeout, | ||
route_id = ctx.var.route_id, | ||
server_addr = ctx.var.server_addr, | ||
} | ||
|
||
local err | ||
log_buffer, err = batch_processor:new(func, config) | ||
|
||
if not log_buffer then | ||
core.log.error("error when creating the batch processor: ", err) | ||
return | ||
end | ||
|
||
buffers[conf] = log_buffer | ||
log_buffer:push(entry) | ||
end | ||
|
||
|
||
return _M |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
--- | ||
title: skywalking-logger | ||
--- | ||
|
||
<!-- | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one or more | ||
# contributor license agreements. See the NOTICE file distributed with | ||
# this work for additional information regarding copyright ownership. | ||
# The ASF licenses this file to You under the Apache License, Version 2.0 | ||
# (the "License"); you may not use this file except in compliance with | ||
# the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
--> | ||
|
||
## Summary | ||
|
||
- [**Name**](#name) | ||
- [**Attributes**](#attributes) | ||
- [**How To Enable**](#how-to-enable) | ||
- [**Test Plugin**](#test-plugin) | ||
- [**Metadata**](#metadata) | ||
- [**Disable Plugin**](#disable-plugin) | ||
|
||
## Name | ||
|
||
`skywalking-logger` is a plugin which push Access Log data to `SkyWalking OAP` server over HTTP. If there is tracing context existing, it sets up the trace-log correlation automatically, and relies on [SkyWalking Cross Process Propagation Headers Protocol](https://skywalking.apache.org/docs/main/latest/en/protocols/skywalking-cross-process-propagation-headers-protocol-v3/). | ||
|
||
This will provide the ability to send Access Log as JSON objects to `SkyWalking OAP` server. | ||
|
||
## Attributes | ||
|
||
| Name | Type | Requirement | Default | Valid | Description | | ||
| ---------------- | ------- | ----------- | ------------- | ------- | ---------------------------------------------------------------------------------------- | | ||
| endpoint_addr | string | required | | | The URI of the `SkyWalking OAP` server. | | ||
| service_name | string | optional | "APISIX" | | service name for SkyWalking reporter. | | ||
| service_instance_name | string | optional |"APISIX Instance Name" | | service instance name for SkyWalking reporter, set it to `$hostname` to get local hostname directly.| | ||
| timeout | integer | optional | 3 | [1,...] | Time to keep the connection alive after sending a request. | | ||
| name | string | optional | "skywalking logger" | | A unique identifier to identity the logger. | | ||
| batch_max_size | integer | optional | 1000 | [1,...] | Set the maximum number of logs sent in each batch. When the number of logs reaches the set maximum, all logs will be automatically pushed to the `SkyWalking OAP` server. | | ||
| inactive_timeout | integer | optional | 5 | [1,...] | The maximum time to refresh the buffer (in seconds). When the maximum refresh time is reached, all logs will be automatically pushed to the `SkyWalking OAP` server regardless of whether the number of logs in the buffer reaches the maximum number set. | | ||
| buffer_duration | integer | optional | 60 | [1,...] | Maximum age in seconds of the oldest entry in a batch before the batch must be processed.| | ||
| max_retry_count | integer | optional | 0 | [0,...] | Maximum number of retries before removing from the processing pipe line. | | ||
| retry_delay | integer | optional | 1 | [0,...] | Number of seconds the process execution should be delayed if the execution fails. | | ||
| include_req_body | boolean | optional | false | [false, true] | Whether to include the request body. false: indicates that the requested body is not included; true: indicates that the requested body is included. | | ||
|
||
## How To Enable | ||
|
||
The following is an example of how to enable the `skywalking-logger` for a specific route. Before that, an available `SkyWalking OAP` server was required and accessible. | ||
|
||
```shell | ||
curl http://127.0.0.1:9080/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' | ||
{ | ||
"plugins": { | ||
"http-logger": { | ||
"endpoint_addr": "http://127.0.0.1:12800" | ||
} | ||
}, | ||
"upstream": { | ||
"type": "roundrobin", | ||
"nodes": { | ||
"127.0.0.1:1980": 1 | ||
} | ||
}, | ||
"uri": "/hello" | ||
}' | ||
``` | ||
|
||
## Test Plugin | ||
|
||
> success: | ||
```shell | ||
$ curl -i http://127.0.0.1:9080/hello | ||
HTTP/1.1 200 OK | ||
... | ||
hello, world | ||
``` | ||
|
||
Completion of the steps, could find the Log details on `SkyWalking UI`. | ||
|
||
## Metadata | ||
|
||
`skywalking-logger` also supports to custom log format like [http-logger](./http-logger.md). | ||
|
||
| Name | Type | Requirement | Default | Valid | Description | | ||
| ---------------- | ------- | ----------- | ------------- | ------- | ---------------------------------------------------------------------------------------- | | ||
| log_format | object | optional | {"host": "$host", "@timestamp": "$time_iso8601", "client_ip": "$remote_addr"} | | Log format declared as key value pair in JSON format. Only string is supported in the `value` part. If the value starts with `$`, it means to get `APISIX` variables or [Nginx variable](http://nginx.org/en/docs/varindex.html). | | ||
|
||
Note that **the metadata configuration is applied in global scope**, which means it will take effect on all Route or Service which use `skywalking-logger` plugin. | ||
|
||
## Disable Plugin | ||
|
||
Remove the corresponding json configuration in the plugin configuration to disable the `skywalking-logger`. | ||
APISIX plugins are hot-reloaded, therefore no need to restart APISIX. | ||
|
||
```shell | ||
$ curl http://127.0.0.1:9080/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' | ||
{ | ||
"uri": "/hello", | ||
"plugins": {}, | ||
"upstream": { | ||
"type": "roundrobin", | ||
"nodes": { | ||
"127.0.0.1:1980": 1 | ||
} | ||
} | ||
}' | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.