-
Notifications
You must be signed in to change notification settings - Fork 12
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
ISSUE-6514 Add support for influxdb2 in pulsar-influxdb-sink(#6514) (…
…#6601) Fixes #6514 ### Motivation The latest InfluxDB release(v2) has changed their write path, 'org'、'bucket'、'token' are required properties to communicate with it; New InfluxDB client library is provided with different package name, from org.influxdb to com.influxdb This PR would add support to the latest InfluxDB 2.0 write API. ### Modifications Since the influxdb2 client library changes a lot, even the package name changed, so the new client library is added in this pull request, while the old one is kept, which means the implement of client v1 is kept unchanged, so no migration is needed for influxdbv1 users。 The sink for influxdbv1 fails to decode messages encoded in json, it supposes only avro. Support of json schema is added in this pull request. The sink for influxdbv1 requires influxdb fields as schema fields, so users should define schemas for each kind of device, that is not convenient. The new added sink for influxdbv2 redefines the message schema, and wraps influxdb 'fields' in one map field, just like 'tags' which is already defined as map in the previous sink for influxdbv1. ### Verifying this change - Unit tests are added for this sink for influxdbv2.
- Loading branch information
1 parent
3e28be0
commit aeabebf
Showing
20 changed files
with
1,241 additions
and
360 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
129 changes: 129 additions & 0 deletions
129
pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/BatchSink.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.pulsar.io.influxdb; | ||
|
||
import com.google.common.collect.Lists; | ||
import lombok.extern.slf4j.Slf4j; | ||
import lombok.val; | ||
import org.apache.commons.collections.CollectionUtils; | ||
import org.apache.pulsar.client.api.schema.Field; | ||
import org.apache.pulsar.client.api.schema.GenericRecord; | ||
import org.apache.pulsar.functions.api.Record; | ||
import org.apache.pulsar.io.core.Sink; | ||
|
||
import java.util.List; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
/** | ||
* Pulsar sink which can write data to target in batch. | ||
* | ||
* @param <T> The type of data to write to target database | ||
* @param <R> Pulsar message type, such as GenericRecord | ||
*/ | ||
@Slf4j | ||
public abstract class BatchSink<T, R> implements Sink<R> { | ||
private int batchSize; | ||
private List<Record<R>> incomingList; | ||
private ScheduledExecutorService flushExecutor; | ||
|
||
protected void init(long batchTimeMs, int batchSize) { | ||
this.batchSize = batchSize; | ||
|
||
incomingList = Lists.newArrayList(); | ||
flushExecutor = Executors.newSingleThreadScheduledExecutor(); | ||
flushExecutor.scheduleAtFixedRate(this::flush, batchTimeMs, batchTimeMs, TimeUnit.MILLISECONDS); | ||
} | ||
|
||
@Override | ||
final public void write(Record<R> record) { | ||
int currentSize; | ||
synchronized (this) { | ||
if (null != record) { | ||
incomingList.add(record); | ||
} | ||
currentSize = incomingList.size(); | ||
} | ||
|
||
if (currentSize >= batchSize) { | ||
flushExecutor.submit(this::flush); | ||
} | ||
} | ||
|
||
private void flush() { | ||
List<Record<R>> toFlushList; | ||
|
||
synchronized (this) { | ||
if (incomingList.isEmpty()) { | ||
return; | ||
} | ||
toFlushList = incomingList; | ||
incomingList = Lists.newArrayList(); | ||
} | ||
|
||
val points = Lists.<T>newArrayListWithExpectedSize(toFlushList.size()); | ||
if (CollectionUtils.isNotEmpty(toFlushList)) { | ||
for (Record<R> record: toFlushList) { | ||
try { | ||
points.add(buildPoint(record)); | ||
} catch (Exception e) { | ||
record.fail(); | ||
toFlushList.remove(record); | ||
log.warn("Record flush thread was exception ", e); | ||
} | ||
} | ||
} | ||
|
||
try { | ||
if (CollectionUtils.isNotEmpty(points)) { | ||
writePoints(points); | ||
} | ||
toFlushList.forEach(Record::ack); | ||
points.clear(); | ||
toFlushList.clear(); | ||
} catch (Exception e) { | ||
toFlushList.forEach(Record::fail); | ||
log.error("InfluxDB write batch data exception ", e); | ||
} | ||
} | ||
|
||
@Override | ||
public void close() throws Exception { | ||
if (null != flushExecutor) { | ||
flushExecutor.shutdown(); | ||
} | ||
} | ||
|
||
protected Object getFiled(GenericRecord record, String fieldName) { | ||
List<Field> fields = record.getFields(); | ||
val fieldMatch = fields.stream() | ||
.filter(field -> fieldName.equals(field.getName())) | ||
.findAny() | ||
.orElse(null); | ||
if (null != fieldMatch) { | ||
return record.getField(fieldMatch); | ||
} else { | ||
return null; | ||
} | ||
} | ||
|
||
protected abstract T buildPoint(Record<R> message) throws Exception; | ||
protected abstract void writePoints(List<T> points) throws Exception; | ||
} |
156 changes: 0 additions & 156 deletions
156
pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/InfluxDBAbstractSink.java
This file was deleted.
Oops, something went wrong.
Oops, something went wrong.