Skip to content

Commit

Permalink
sync master and dev branch to v1.3.1
Browse files Browse the repository at this point in the history
sync master and dev branch to v1.3.1
  • Loading branch information
fisco-dev authored Jul 9, 2018
2 parents b2ac33a + 8ff11e9 commit 9e17a38
Show file tree
Hide file tree
Showing 17 changed files with 3,791 additions and 368 deletions.
6 changes: 4 additions & 2 deletions Changelog.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
### V1.3.1

* Update

1. nodejs支持发送国密算法交易
1. 节点建立连接时过滤自身的ip端口
2. 修复使用web3sdk会偶尔断开无法重连的bug
3. 修复节点间连接会偶尔断开无法重连的bug
4. nodejs支持发送国密算法交易

* Add

Expand Down
1 change: 1 addition & 0 deletions libchannelserver/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ file(GLOB HEADERS "*.h")
add_library(channelserver ${SRC_LIST} ${HEADERS})
target_include_directories(channelserver PRIVATE ..)
target_include_directories(channelserver PUBLIC ${BOOST_INCLUDE_DIR})

eth_use(channelserver OPTIONAL OpenSSL)
target_link_libraries(channelserver ${OPENSSL_SSL_LIBRARIE} ${OPENSSL_CRYPTO_LIBRARIE} krb5 z dl)
199 changes: 172 additions & 27 deletions libchannelserver/ChannelMessage.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,69 +26,214 @@
#include <string>
#include <thread>
#include <queue>
#include <boost/lexical_cast.hpp>
#include <libdevcore/FixedHash.h>
#include <libdevcore/easylog.h>
#include <arpa/inet.h>
#include "http_parser.h"
#include "ChannelException.h"

namespace dev
{

namespace channel {

struct Message {
class Message: public std::enable_shared_from_this<Message> {
public:
typedef std::shared_ptr<Message> Ptr;
//包头组成,长度 + 类型 + seq(32字节UUID) + 结果
const static size_t HEADER_LENGTH = 4 + 2 + 32 + 4;
const static size_t MAX_LENGTH = 1024 * 1024 * 1024;

uint32_t length = 0; //可判断http,走原http流程
uint16_t type = 0;
std::string seq = ""; //32字节
int result = 0;

std::shared_ptr<bytes> data;

Message() {
data = std::make_shared<bytes>();
_data = std::make_shared<bytes>();
}

void encode(bytes &buffer) {
uint32_t lengthN = htonl(HEADER_LENGTH + data->size());
uint16_t typeN = htons(type);
virtual ~Message() {}
//uint32_t seqN = htonl(seq);
int32_t resultN = htonl(result);
virtual void encode(bytes &buffer) {
uint32_t lengthN = htonl(HEADER_LENGTH + _data->size());
uint16_t typeN = htons(_type);
int32_t resultN = htonl(_result);

buffer.insert(buffer.end(), (byte*) &lengthN, (byte*) &lengthN + sizeof(lengthN));
buffer.insert(buffer.end(), (byte*) &typeN, (byte*) &typeN + sizeof(typeN));
//buffer.insert(buffer.end(), (byte*) &seqN, (byte*) &seqN + sizeof(seqN));
buffer.insert(buffer.end(), seq.data(), seq.data() + seq.size());
buffer.insert(buffer.end(), _seq.data(), _seq.data() + _seq.size());
buffer.insert(buffer.end(), (byte*) &resultN, (byte*) &resultN + sizeof(resultN));

buffer.insert(buffer.end(), data->begin(), data->end());
buffer.insert(buffer.end(), _data->begin(), _data->end());
}

virtual ssize_t decode(const byte* buffer, size_t size) {
return decodeAMOP(buffer, size);
}

virtual bool isHttp(const byte* buffer, size_t size) {
std::string cmd = std::string((const char*)buffer, size);

if(cmd.find_first_of("GET ") == 0 || cmd.find_first_of("POST") == 0) {
return true;
}

return false;
}

virtual void onHttpBody(const char* buffer, size_t size) {
_data->assign((const byte*)buffer, (const byte*)buffer + size);
}

virtual ssize_t decodeHttp(const byte* buffer, size_t size) {
//解析http协议
std::shared_ptr<http_parser> httpParser = std::make_shared<http_parser>();
http_parser_init(httpParser.get(), HTTP_REQUEST);

std::shared_ptr<http_parser_settings>httpParserSettings = std::make_shared<http_parser_settings>();
http_parser_settings_init(httpParserSettings.get());

httpParserSettings->onBody = std::bind(&Message::onHttpBody, shared_from_this(), std::placeholders::_1, std::placeholders::_2);

unsigned long count = http_parser_execute(httpParser.get(), httpParserSettings.get(), (const char*)buffer, size);

if(httpParser->http_errno != 0) {
throw(ChannelException(-1, "解析http请求错误:" + boost::lexical_cast<std::string>(httpParser->http_errno)));
}

if(!_data->empty()) {
//解析完成
//以太坊请求
_length = _data->size();
_type = 0x20;
_seq = "";
_result = 0;

return _data->size();
}

return 0;
}

ssize_t decode(const byte* buffer, size_t size) {
virtual ssize_t decodeAMOP(const byte* buffer, size_t size) {
if(size < HEADER_LENGTH) {
return 0;
}

length = ntohl(*((uint32_t*)&buffer[0]));
_length = ntohl(*((uint32_t*) &buffer[0]));

if(length > MAX_LENGTH) {
if (_length > MAX_LENGTH) {
return -1;
}

if(size < length) {
if (size < _length) {
return 0;
}

type = ntohs(*((uint16_t*)&buffer[4]));
seq = ntohl(*((uint32_t*)&buffer[6]));
seq.assign(&buffer[6], &buffer[6] + 32);
result = ntohl(*((uint32_t*)&buffer[38]));
_type = ntohs(*((uint16_t*) &buffer[4]));
_seq.assign(&buffer[6], &buffer[6] + 32);
_result = ntohl(*((uint32_t*) &buffer[38]));

_data->assign(&buffer[HEADER_LENGTH],
&buffer[HEADER_LENGTH] + _length - HEADER_LENGTH);

return _length;
}

virtual uint32_t length() { return _length; }

virtual uint16_t type() { return _type; }
virtual void setType(uint16_t type) { _type = type; }

virtual std::string seq() { return _seq; }
virtual void setSeq(std::string seq) { _seq = seq; }

virtual int result() { return _result; }
virtual void setResult(int result) { _result = result; }

virtual byte* data() { return _data->data(); }
virtual size_t dataSize() { return _data->size(); }

virtual void setData(const byte *p, size_t size) { _data->assign(p, p + size); }

virtual void clearData() { _data->clear(); }

protected:
//最小包头,兼容http
const static size_t MIN_HEADER_LENGTH = 4;

data->assign(&buffer[HEADER_LENGTH], &buffer[HEADER_LENGTH] + length - HEADER_LENGTH);
//包头组成,长度 + 类型 + seq(32字节UUID) + 结果
const static size_t HEADER_LENGTH = 4 + 2 + 32 + 4;
const static size_t MAX_LENGTH = 1024 * 1024 * 1024; //最大支持1M的消息

uint32_t _length = 0; //可判断http,走原http流程
uint16_t _type = 0;
std::string _seq = std::string(32, '0'); //32字节seq
int _result = 0;

std::shared_ptr<bytes> _data;
};

class TopicMessage: public Message {
public:
typedef std::shared_ptr<TopicMessage> Ptr;

TopicMessage() {};

TopicMessage(Message *message) {
_length = message->length();
_type = message->type();
_seq = message->seq();
_result = message->result();

_data = std::make_shared<bytes>(message->data(), message->data() + message->dataSize());
}

virtual ~TopicMessage() {}

virtual std::string topic() {
if(!(_type == 0x30 || _type == 0x31)) {
throw(ChannelException(-1, "type: " + boost::lexical_cast<std::string>(_type) + " 非ChannelMessage"));
}

if(_data->size() < 1) {
throw(ChannelException(-1, "错误,消息长度为0"));
}

uint8_t topicLen = *((uint8_t*)_data->data());

if(_data->size() < topicLen) {
throw(ChannelException(-1, "错误,topic长度不足 topicLen: " + boost::lexical_cast<std::string>(topicLen) + " size:" + boost::lexical_cast<std::string>(_data->size())));
}

std::string topic((char*)_data->data() + 1, topicLen - 1);

return topic;
}

virtual void setTopic(const std::string &topic) {
if(_data->size() > 0) {
throw(ChannelException(-1, "禁止在已有data时设置topic"));
}

_data->push_back((char) topic.size() + 1);
_data->insert(_data->end(), topic.begin(), topic.end());
}

virtual byte* data() override {
std::string topic = this->topic();

return _data->data() + 1 + topic.size();
}

virtual size_t dataSize() override {
std::string topic = this->topic();

return _data->size() - 1 - topic.size();
}

virtual void setData(const byte *p, size_t size) override {
if(_data->empty()) {
throw(ChannelException(-1, "禁止在无topic时设置data"));
}

return length;
_data->insert(_data->end(), p, p + size);
}
};

Expand Down
Loading

0 comments on commit 9e17a38

Please sign in to comment.