Skip to content

Commit

Permalink
Add pulsar client
Browse files Browse the repository at this point in the history
  • Loading branch information
BewareMyPower committed Aug 27, 2020
1 parent 8cb5b10 commit 098ce82
Show file tree
Hide file tree
Showing 31 changed files with 1,762 additions and 2 deletions.
52 changes: 52 additions & 0 deletions cxx/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ set(CMAKE_PREFIX_PATH

set(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE)

find_package(Boost REQUIRED)
message(STATUS "Boost_INCLUDE_DIRS: ${Boost_INCLUDE_DIRS}")

# find qbus::kafka's dependencies
find_path(LIBRDKAFKA_INCLUDE_DIR NAMES "librdkafka/rdkafka.h")
find_library(LIBRDKAFKA_LIB NAMES librdkafka.a)
Expand All @@ -46,6 +49,37 @@ add_definitions("-Wno-deprecated-declarations")
message(STATUS "LOG4CPLUS_INCLUDE_DIR=${LOG4CPLUS_INCLUDE_DIR}")
message(STATUS "LOG4CPLUS_LIB=${LOG4CPLUS_LIB}")

# find qbus::pulsar's dependencies
find_path(PULSAR_INCLUDE_PATH NAMES "pulsar/Client.h")
find_library(PULSAR_LIBRARY_PATH NAMES libpulsar.a)

# find libpulsar.a's dependencies
find_library(PROTOC_LIBRARY_PATH NAMES libprotoc.a)
find_library(PROTOBUF_LIBRARY_PATH NAMES libprotobuf.a)
find_library(PROTOBUF_LITE_LIBRARY_PATH NAMES libprotobuf-lite.a)
message(STATUS "PROTOC_LIBRARY_PATH=${PROTOC_LIBRARY_PATH}")
message(STATUS "PROTOBUF_LIBRARY_PATH=${PROTOBUF_LIBRARY_PATH}")
message(STATUS "PROTOBUF_LITE_LIBRARY_PATH=${PROTOBUF_LITE_LIBRARY_PATH}")
if (NOT PROTOC_LIBRARY_PATH OR NOT PROTOBUF_LIBRARY_PATH OR NOT PROTOBUF_LITE_LIBRARY_PATH)
message(FATAL_ERROR "Can't find Protobuf")
endif ()
set(PROTOBUF_LIBS ${PROTOC_LIBRARY_PATH} ${PROTOBUF_LIBRARY_PATH} ${PROTOBUF_LITE_LIBRARY_PATH})

find_library(BOOST_SYSTEM_LIBRARY NAMES libboost_system.a)
find_library(BOOST_REGEX_LIBRARY NAMES libboost_regex.a)
message(STATUS "BOOST_SYSTEM_LIBRARY=${BOOST_SYSTEM_LIBRARY}")
message(STATUS "BOOST_REGEX_LIBRARY=${BOOST_REGEX_LIBRARY}")
if (NOT BOOST_SYSTEM_LIBRARY OR NOT BOOST_REGEX_LIBRARY)
message(FATAL_ERROR "Can't find Boost.System and Boost.Regex")
endif ()
set(BOOST_LIBS ${BOOST_SYSTEM_LIBRARY} ${BOOST_REGEX_LIBRARY})

message(STATUS "Pulsar include path: ${PULSAR_INCLUDE_PATH}")
message(STATUS "Pulsar library path: ${PULSAR_LIBRARY_PATH}")
if (NOT PULSAR_INCLUDE_PATH OR NOT PULSAR_LIBRARY_PATH)
message(FATAL_ERROR "Pulsar not found")
endif ()

set(HEADERS
qbus_consumer.h
qbus_producer.h
Expand All @@ -64,6 +98,18 @@ set(SOURCES
kafka/qbus_topic_partition_set.cc
kafka/util/logger.cc
kafka/util/version.cc
pulsar/log4cplus_logger.cc
pulsar/property_tree_proxy.cc
pulsar/pulsar_config.cc
pulsar/qbus_consumer.cc
pulsar/qbus_consumer_config.cc
pulsar/qbus_php_consumer.cc
pulsar/qbus_php_producer.cc
pulsar/qbus_producer.cc
pulsar/qbus_producer_config.cc
pulsar/qbus_producer_map.cc
pulsar/retryable_send_callback.cc
pulsar/timer.cc
qbus_consumer.cc
qbus_producer.cc
)
Expand All @@ -76,16 +122,22 @@ include_directories(..
${EXTRA_INCLUDE_DIRS}
${LIBRDKAFKA_INCLUDE_DIR}
${LOG4CPLUS_INCLUDE_DIR}
${PULSAR_INCLUDE_PATH}
)
target_link_libraries(${LIBNAME}
${EXTRA_LIBS}
${LIBRDKAFKA_LIB}
${LOG4CPLUS_LIB}
# put it before PROTOBUF_LIBS and BOOST_LIBS which are its dependencies
${PULSAR_LIBRARY_PATH}
${PROTOBUF_LIBS}
${BOOST_LIBS}
dl
pthread
rt
z
ssl
curl
-static-libstdc++
)

Expand Down
41 changes: 41 additions & 0 deletions cxx/src/mq_type.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#pragma once

#include <cctype>
#include <algorithm>
#include <boost/property_tree/ini_parser.hpp>
namespace pt = boost::property_tree;

namespace qbus {

enum class MqType
{
KAFKA,
PULSAR
};

inline MqType mqType(const std::string& config_path) {
pt::ptree root;
try {
pt::ini_parser::read_ini(config_path, root);
} catch (const pt::ini_parser_error& e) {
throw std::runtime_error("Failed to parse " + config_path + ": " + e.what());
}

auto iter = root.find("mq.type");
if (iter == root.not_found()) {
return MqType::KAFKA; // default type is kafka
}

auto mq_type = iter->second.get_value<std::string>();
std::transform(mq_type.cbegin(), mq_type.cend(), mq_type.begin(), [](char ch) { return tolower(ch); });

if (mq_type == "kafka") {
return MqType::KAFKA;
} else if (mq_type == "pulsar") {
return MqType::PULSAR;
} else {
throw std::runtime_error("Unsupported mq.type: " + mq_type);
}
}

} // namespace qbus
61 changes: 61 additions & 0 deletions cxx/src/pulsar/log4cplus_logger.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#include "log4cplus_logger.h"
#include <assert.h>
#include <mutex>
#include <log4cplus/logger.h>
#include <log4cplus/fileappender.h>
#include <log4cplus/consoleappender.h>

using namespace ::pulsar;

namespace qbus {
namespace pulsar {

static const log4cplus::LogLevel kLog4cplusLevel[4] = {log4cplus::DEBUG_LOG_LEVEL, log4cplus::INFO_LOG_LEVEL,
log4cplus::WARN_LOG_LEVEL, log4cplus::ERROR_LOG_LEVEL};

Log4cplusLogger::Log4cplusLogger(const std::string& filename)
: logger_(log4cplus::Logger::getRoot()), level_(LEVEL_DEBUG), filename_(filename) {}

bool Log4cplusLogger::isEnabled(Level level) { return level >= level_; }

void Log4cplusLogger::log(Level level, int line, const std::string& message) {
assert(level >= 0 && level < 4);
logger_.log(kLog4cplusLevel[level], message, filename_.c_str(), line);
}

void Log4cplusLogger::setLevel(const std::string& level) {
logger_.setLogLevel(log4cplus::LogLevelManager().fromString(level));
}

void Log4cplusLogger::setLogPath(const std::string& path) {
using namespace log4cplus;

const char* pattern = "[%p] [%D{%m/%d/%y %H:%M:%S,%q}] [%t] [%l] - %m %n";
std::auto_ptr<Layout> layout(new PatternLayout(pattern));
SharedAppenderPtr appender;
if (!path.empty()) {
appender = new RollingFileAppender(path, 100 * 1024 * 1024 /* 100 MB */);
} else {
appender = new ConsoleAppender();
}

appender->setLayout(layout);
logger_.removeAllAppenders();
logger_.addAppender(appender);
}

Log4cplusLoggerFactory::Log4cplusLoggerFactory(const std::string& level, const std::string& log_path)
: level_(level), log_path_(log_path) {
static std::once_flag flag;
std::call_once(flag, [] { log4cplus::initialize(); });
}

::pulsar::Logger* Log4cplusLoggerFactory::getLogger(const std::string& filename) {
auto log4cplus_logger = new Log4cplusLogger(filename);
log4cplus_logger->setLevel(level_);
log4cplus_logger->setLogPath(log_path_);
return static_cast<::pulsar::Logger*>(log4cplus_logger);
}

} // namespace pulsar
} // namespace qbus
52 changes: 52 additions & 0 deletions cxx/src/pulsar/log4cplus_logger.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#pragma once

#include <atomic>
#include <memory>
#include <mutex>

#include <pulsar/Logger.h>
#include <log4cplus/logger.h>

namespace qbus {
namespace pulsar {

class Log4cplusLogger final : public ::pulsar::Logger {
public:
Log4cplusLogger(const std::string& filename);

bool isEnabled(Level level) override;
void log(Level level, int line, const std::string& message) override;

void setLevel(const std::string& level);
void setLogPath(const std::string& path);

private:
log4cplus::Logger logger_;
Level level_;
const std::string filename_;

#ifdef NOT_USE_CONSUMER_CALLBACK
static std::atomic_int kInitialized;

public:
static void uninit();
#endif
};

class Log4cplusLoggerFactory final : public ::pulsar::LoggerFactory {
public:
Log4cplusLoggerFactory(const std::string& level, const std::string& log_path);

::pulsar::Logger* getLogger(const std::string& filename) override;

void setLevel(const std::string& level) { level_ = level; }
void setLogPath(const std::string& log_path) { log_path_ = log_path; }

private:
std::string level_ = "debug";
std::string log_path_;
mutable std::mutex mutex_;
};

} // namespace pulsar
} // namespace qbus
99 changes: 99 additions & 0 deletions cxx/src/pulsar/log_utils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once

#include <string>
#include <sstream>
#include <memory>

#include <pulsar/defines.h>
#include <pulsar/Logger.h>

namespace pulsar {

#ifdef __GNUC__
#define PULSAR_UNLIKELY(expr) __builtin_expect(expr, 0)
#else
#define PULSAR_UNLIKELY(expr) (expr)
#endif

#define DECLARE_LOG_OBJECT() \
static ::pulsar::Logger* logger() { \
static thread_local std::unique_ptr<::pulsar::Logger> threadSpecificLogPtr; \
::pulsar::Logger* ptr = threadSpecificLogPtr.get(); \
if (PULSAR_UNLIKELY(!ptr)) { \
std::string logger = ::pulsar::LogUtils::getLoggerName(__FILE__); \
threadSpecificLogPtr.reset(::pulsar::LogUtils::getLoggerFactory()->getLogger(logger)); \
ptr = threadSpecificLogPtr.get(); \
} \
return ptr; \
}

#define LOG_DEBUG(message) \
{ \
if (PULSAR_UNLIKELY(logger()->isEnabled(::pulsar::Logger::LEVEL_DEBUG))) { \
std::stringstream ss; \
ss << __FUNCTION__ << " | "; \
ss << message; \
logger()->log(::pulsar::Logger::LEVEL_DEBUG, __LINE__, ss.str()); \
} \
}

#define LOG_INFO(message) \
{ \
if (logger()->isEnabled(::pulsar::Logger::LEVEL_INFO)) { \
std::stringstream ss; \
ss << __FUNCTION__ << " | "; \
ss << message; \
logger()->log(::pulsar::Logger::LEVEL_INFO, __LINE__, ss.str()); \
} \
}

#define LOG_WARN(message) \
{ \
if (logger()->isEnabled(::pulsar::Logger::LEVEL_WARN)) { \
std::stringstream ss; \
ss << __FUNCTION__ << " | "; \
ss << message; \
logger()->log(::pulsar::Logger::LEVEL_WARN, __LINE__, ss.str()); \
} \
}

#define LOG_ERROR(message) \
{ \
if (logger()->isEnabled(::pulsar::Logger::LEVEL_ERROR)) { \
std::stringstream ss; \
ss << __FUNCTION__ << " | "; \
ss << message; \
logger()->log(::pulsar::Logger::LEVEL_ERROR, __LINE__, ss.str()); \
} \
}

class PULSAR_PUBLIC LogUtils {
public:
static void init(const std::string& logConfFilePath);

static void setLoggerFactory(std::unique_ptr<LoggerFactory> loggerFactory);

static LoggerFactory* getLoggerFactory();

static std::string getLoggerName(const std::string& path);
};

} // namespace pulsar
16 changes: 16 additions & 0 deletions cxx/src/pulsar/make_unique.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#pragma once

#include <memory>

#if __cplusplus < 201402L

namespace std {

template <typename T, typename... Args>
inline std::unique_ptr<T> make_unique(Args&&... args) {
return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
}

} // namespace std

#endif
15 changes: 15 additions & 0 deletions cxx/src/pulsar/property_tree_proxy.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#include "property_tree_proxy.h"
#include "log_utils.h"

DECLARE_LOG_OBJECT()

namespace qbus {
namespace pulsar {

void PropertyTreeProxy::handleBadDataError(const char* key, const std::string& value, const char* type,
const pt::ptree_bad_data& e) {
LOG_WARN("Failed to convert " << value << "(of key: " << key << ") to " << type);
}

} // namespace pulsar
} // namespace qbus
Loading

0 comments on commit 098ce82

Please sign in to comment.