Skip to content

Commit

Permalink
feat: provide skywalking logger plugin (apache#5478)
Browse files Browse the repository at this point in the history
Co-authored-by: 罗泽轩 <spacewanderlzx@gmail.com>
Co-authored-by: 吴晟 Wu Sheng <wu.sheng@foxmail.com>
  • Loading branch information
3 people authored Nov 12, 2021
1 parent 0570d59 commit 5f6afdb
Show file tree
Hide file tree
Showing 8 changed files with 738 additions and 1 deletion.
233 changes: 233 additions & 0 deletions apisix/plugins/skywalking-logger.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--

local batch_processor = require("apisix.utils.batch-processor")
local log_util = require("apisix.utils.log-util")
local core = require("apisix.core")
local http = require("resty.http")
local url = require("net.url")
local plugin = require("apisix.plugin")

local base64 = require("ngx.base64")
local ngx_re = require("ngx.re")

local ngx = ngx
local tostring = tostring
local tonumber = tonumber
local ipairs = ipairs
local timer_at = ngx.timer.at

local plugin_name = "skywalking-logger"
local stale_timer_running = false
local buffers = {}

local schema = {
type = "object",
properties = {
endpoint_addr = core.schema.uri_def,
service_name = {type = "string", default = "APISIX"},
service_instance_name = {type = "string", default = "APISIX Instance Name"},
timeout = {type = "integer", minimum = 1, default = 3},
name = {type = "string", default = "skywalking logger"},
max_retry_count = {type = "integer", minimum = 0, default = 0},
retry_delay = {type = "integer", minimum = 0, default = 1},
buffer_duration = {type = "integer", minimum = 1, default = 60},
inactive_timeout = {type = "integer", minimum = 1, default = 5},
batch_max_size = {type = "integer", minimum = 1, default = 1000},
include_req_body = {type = "boolean", default = false},
},
required = {"endpoint_addr"},
}


local metadata_schema = {
type = "object",
properties = {
log_format = log_util.metadata_schema_log_format,
},
}


local _M = {
version = 0.1,
priority = 408,
name = plugin_name,
schema = schema,
metadata_schema = metadata_schema,
}


function _M.check_schema(conf, schema_type)
if schema_type == core.schema.TYPE_METADATA then
return core.schema.check(metadata_schema, conf)
end
return core.schema.check(schema, conf)
end


local function send_http_data(conf, log_message)
local err_msg
local res = true
local url_decoded = url.parse(conf.endpoint_addr)
local host = url_decoded.host
local port = url_decoded.port

core.log.info("sending a batch logs to ", conf.endpoint_addr)

local httpc = http.new()
httpc:set_timeout(conf.timeout * 1000)
local ok, err = httpc:connect(host, port)

if not ok then
return false, "failed to connect to host[" .. host .. "] port["
.. tostring(port) .. "] " .. err
end

local httpc_res, httpc_err = httpc:request({
method = "POST",
path = "/v3/logs",
body = log_message,
headers = {
["Host"] = url_decoded.host,
["Content-Type"] = "application/json",
}
})

if not httpc_res then
return false, "error while sending data to [" .. host .. "] port["
.. tostring(port) .. "] " .. httpc_err
end

-- some error occurred in the server
if httpc_res.status >= 400 then
res = false
err_msg = "server returned status code[" .. httpc_res.status .. "] host["
.. host .. "] port[" .. tostring(port) .. "] "
.. "body[" .. httpc_res:read_body() .. "]"
end

return res, err_msg
end


-- remove stale objects from the memory after timer expires
local function remove_stale_objects(premature)
if premature then
return
end

for key, batch in ipairs(buffers) do
if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
core.log.warn("removing batch processor stale object, conf: ",
core.json.delay_encode(key))
buffers[key] = nil
end
end

stale_timer_running = false
end


function _M.log(conf, ctx)
local metadata = plugin.plugin_metadata(plugin_name)
core.log.info("metadata: ", core.json.delay_encode(metadata))

local log_body
if metadata and metadata.value.log_format
and core.table.nkeys(metadata.value.log_format) > 0
then
log_body = log_util.get_custom_format_log(ctx, metadata.value.log_format)
else
log_body = log_util.get_full_log(ngx, conf)
end

local trace_context
local sw_header = ngx.req.get_headers()["sw8"]
if sw_header then
-- 1-TRACEID-SEGMENTID-SPANID-PARENT_SERVICE-PARENT_INSTANCE-PARENT_ENDPOINT-IPPORT
local ids = ngx_re.split(sw_header, '-')
if #ids == 8 then
trace_context = {
traceId = base64.decode_base64url(ids[2]),
traceSegmentId = base64.decode_base64url(ids[3]),
spanId = tonumber(ids[4])
}
else
core.log.warn("failed to parse trace_context header: ", sw_header)
end
end

local entry = {
traceContext = trace_context,
body = {
json = {
json = core.json.encode(log_body, true)
}
},
service = conf.service_name,
serviceInstance = conf.service_instance_name,
endpoint = ctx.var.uri,
}

if not stale_timer_running then
-- run the timer every 30 mins if any log is present
timer_at(1800, remove_stale_objects)
stale_timer_running = true
end

local log_buffer = buffers[conf]

if log_buffer then
log_buffer:push(entry)
return
end

-- Generate a function to be executed by the batch processor
local func = function(entries, batch_max_size)
local data, err = core.json.encode(entries)
if not data then
return false, 'error occurred while encoding the data: ' .. err
end

return send_http_data(conf, data)
end

local config = {
name = conf.name,
retry_delay = conf.retry_delay,
batch_max_size = conf.batch_max_size,
max_retry_count = conf.max_retry_count,
buffer_duration = conf.buffer_duration,
inactive_timeout = conf.inactive_timeout,
route_id = ctx.var.route_id,
server_addr = ctx.var.server_addr,
}

local err
log_buffer, err = batch_processor:new(func, config)

if not log_buffer then
core.log.error("error when creating the batch processor: ", err)
return
end

buffers[conf] = log_buffer
log_buffer:push(entry)
end


return _M
1 change: 1 addition & 0 deletions conf/config-default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ plugins: # plugin list (sorted by priority)
- datadog # priority: 495
- echo # priority: 412
- http-logger # priority: 410
- skywalking-logger # priority: 408
- sls-logger # priority: 406
- tcp-logger # priority: 405
- kafka-logger # priority: 403
Expand Down
1 change: 1 addition & 0 deletions docs/en/latest/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
"label": "Loggers",
"items": [
"plugins/http-logger",
"plugins/skywalking-logger",
"plugins/tcp-logger",
"plugins/kafka-logger",
"plugins/udp-logger",
Expand Down
117 changes: 117 additions & 0 deletions docs/en/latest/plugins/skywalking-logger.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
---
title: skywalking-logger
---

<!--
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
-->

## Summary

- [**Name**](#name)
- [**Attributes**](#attributes)
- [**How To Enable**](#how-to-enable)
- [**Test Plugin**](#test-plugin)
- [**Metadata**](#metadata)
- [**Disable Plugin**](#disable-plugin)

## Name

`skywalking-logger` is a plugin which push Access Log data to `SkyWalking OAP` server over HTTP. If there is tracing context existing, it sets up the trace-log correlation automatically, and relies on [SkyWalking Cross Process Propagation Headers Protocol](https://skywalking.apache.org/docs/main/latest/en/protocols/skywalking-cross-process-propagation-headers-protocol-v3/).

This will provide the ability to send Access Log as JSON objects to `SkyWalking OAP` server.

## Attributes

| Name | Type | Requirement | Default | Valid | Description |
| ---------------- | ------- | ----------- | ------------- | ------- | ---------------------------------------------------------------------------------------- |
| endpoint_addr | string | required | | | The URI of the `SkyWalking OAP` server. |
| service_name | string | optional | "APISIX" | | service name for SkyWalking reporter. |
| service_instance_name | string | optional |"APISIX Instance Name" | | service instance name for SkyWalking reporter, set it to `$hostname` to get local hostname directly.|
| timeout | integer | optional | 3 | [1,...] | Time to keep the connection alive after sending a request. |
| name | string | optional | "skywalking logger" | | A unique identifier to identity the logger. |
| batch_max_size | integer | optional | 1000 | [1,...] | Set the maximum number of logs sent in each batch. When the number of logs reaches the set maximum, all logs will be automatically pushed to the `SkyWalking OAP` server. |
| inactive_timeout | integer | optional | 5 | [1,...] | The maximum time to refresh the buffer (in seconds). When the maximum refresh time is reached, all logs will be automatically pushed to the `SkyWalking OAP` server regardless of whether the number of logs in the buffer reaches the maximum number set. |
| buffer_duration | integer | optional | 60 | [1,...] | Maximum age in seconds of the oldest entry in a batch before the batch must be processed.|
| max_retry_count | integer | optional | 0 | [0,...] | Maximum number of retries before removing from the processing pipe line. |
| retry_delay | integer | optional | 1 | [0,...] | Number of seconds the process execution should be delayed if the execution fails. |
| include_req_body | boolean | optional | false | [false, true] | Whether to include the request body. false: indicates that the requested body is not included; true: indicates that the requested body is included. |

## How To Enable

The following is an example of how to enable the `skywalking-logger` for a specific route. Before that, an available `SkyWalking OAP` server was required and accessible.

```shell
curl http://127.0.0.1:9080/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"plugins": {
"http-logger": {
"endpoint_addr": "http://127.0.0.1:12800"
}
},
"upstream": {
"type": "roundrobin",
"nodes": {
"127.0.0.1:1980": 1
}
},
"uri": "/hello"
}'
```

## Test Plugin

> success:
```shell
$ curl -i http://127.0.0.1:9080/hello
HTTP/1.1 200 OK
...
hello, world
```

Completion of the steps, could find the Log details on `SkyWalking UI`.

## Metadata

`skywalking-logger` also supports to custom log format like [http-logger](./http-logger.md).

| Name | Type | Requirement | Default | Valid | Description |
| ---------------- | ------- | ----------- | ------------- | ------- | ---------------------------------------------------------------------------------------- |
| log_format | object | optional | {"host": "$host", "@timestamp": "$time_iso8601", "client_ip": "$remote_addr"} | | Log format declared as key value pair in JSON format. Only string is supported in the `value` part. If the value starts with `$`, it means to get `APISIX` variables or [Nginx variable](http://nginx.org/en/docs/varindex.html). |

Note that **the metadata configuration is applied in global scope**, which means it will take effect on all Route or Service which use `skywalking-logger` plugin.

## Disable Plugin

Remove the corresponding json configuration in the plugin configuration to disable the `skywalking-logger`.
APISIX plugins are hot-reloaded, therefore no need to restart APISIX.

```shell
$ curl http://127.0.0.1:9080/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"uri": "/hello",
"plugins": {},
"upstream": {
"type": "roundrobin",
"nodes": {
"127.0.0.1:1980": 1
}
}
}'
```
1 change: 1 addition & 0 deletions docs/zh/latest/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
"label": "Loggers",
"items": [
"plugins/http-logger",
"plugins/skywalking-logger",
"plugins/tcp-logger",
"plugins/kafka-logger",
"plugins/udp-logger",
Expand Down
Loading

0 comments on commit 5f6afdb

Please sign in to comment.