Skip to content

Commit

Permalink
eos ram tick
Browse files Browse the repository at this point in the history
  • Loading branch information
foolcage committed Aug 2, 2018
1 parent bbfa8d2 commit af7983c
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 19 deletions.
25 changes: 12 additions & 13 deletions fooltrader/connector/kafka_connector.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
import argparse
import datetime
import json
import logging
Expand Down Expand Up @@ -94,16 +93,16 @@ def cryptocurrency_tick_to_kafka(exchange, pairs=None):

if __name__ == '__main__':
# kdata_to_kafka(security_item='300027', fuquan='hfq')
# tick_to_kafka(security_item='300027')
tick_to_kafka(security_item='300027')
# cryptocurrency_tick_to_kafka('kraken')
parser = argparse.ArgumentParser()
parser.add_argument('security_type', help='the security type')
parser.add_argument('exchange', help='the exchange')
parser.add_argument('codes', nargs='+', help='the security code list')

args = parser.parse_args()

if args.security_type == 'cryptocurrency':
pairs = [code.replace('-', '/') for code in args.codes]

cryptocurrency_tick_to_kafka(exchange=args.exchange, pairs=pairs)
# parser = argparse.ArgumentParser()
# parser.add_argument('security_type', help='the security type')
# parser.add_argument('exchange', help='the exchange')
# parser.add_argument('codes', nargs='+', help='the security code list')
#
# args = parser.parse_args()
#
# if args.security_type == 'cryptocurrency':
# pairs = [code.replace('-', '/') for code in args.codes]
#
# cryptocurrency_tick_to_kafka(exchange=args.exchange, pairs=pairs)
2 changes: 1 addition & 1 deletion fooltrader/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@
# CRYPTOCURRENCY_EXCHANGES = ["binance", "okex", "huobipro", "bitfinex", "bithumb", "gdax", "kraken", "hitbtc", "lbank",
# "bitz", "bibox", "zb", "bitstamp"]

CRYPTOCURRENCY_EXCHANGES = ["binance", "huobipro", "bitfinex"]
CRYPTOCURRENCY_EXCHANGES = ["binance", "huobipro", "bitfinex", "eos_contract"]

# CRYPTOCURRENCY_BASE = ["BTC", "ETH", "XRP", "BCH", "EOS", "LTC", "XLM", "ADA", "IOTA", "TRX", "NEO", "DASH", "XMR",
# "BNB", "ETC", "QTUM", "ONT"]
Expand Down
21 changes: 21 additions & 0 deletions fooltrader/datasource/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import pandas as pd

from fooltrader import get_security_list_path

df = pd.DataFrame()

# 去中心化交易所
df = df.append(
{
'code': 'RAM-EOS',
'name': 'RAM/EOS',
'listDate': '2018-06-09',
'timestamp': '2018-06-09',
'exchange': 'contract',
'type': 'cryptocurrency',
'id': "cryptocurrency_contract_ram-eos"
}, ignore_index=True)

if not df.empty:
df.to_csv(get_security_list_path(security_type='cryptocurrency', exchange='contract'),
index=False)
102 changes: 102 additions & 0 deletions fooltrader/datasource/eos.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# -*- coding: utf-8 -*-
import json
import logging
import time
from datetime import timedelta, datetime

from kafka import KafkaProducer
from pymongo import MongoClient

from fooltrader import to_time_str
from fooltrader.contract.kafka_contract import get_kafka_tick_topic
from fooltrader.settings import EOS_MONGODB_URL, KAFKA_HOST, TIME_FORMAT_MICRO
from fooltrader.utils.kafka_utils import get_latest_timestamp_order
from fooltrader.utils.utils import to_timestamp

logger = logging.getLogger(__name__)

producer = KafkaProducer(bootstrap_servers=KAFKA_HOST)

client = MongoClient(EOS_MONGODB_URL)

db = client['eosMain']


def to_tick(item):
if item['action'] == 'buyrambytes' or item['action'] == 'buyram':
direction = 1
elif item['action'] == 'sellram':
direction = -1
else:
direction = 0

return {
'timestamp': to_time_str(item['block_time'], time_fmt=TIME_FORMAT_MICRO),
'securityId': 'cryptocurrency_contact_RAM-EOS',
'code': 'RAM-EOS',
'price': item['bytes'] / item['price'],
'direction': direction,
'volume': item['bytes'],
'turnover': item['price'],

'order': item['global_seq'],
'blockNumber': item['block_num'],
'action': item['action'],
'receiver': item['receiver'],
'trxId': item['trx_id'],
'operator': item['operator'],
'fee': item['fee']

}


def eos_ram_to_kafka():
security_id = 'cryptocurrency_contact_RAM-EOS'

latest_timestamp, latest_order = get_latest_timestamp_order(security_id)

topic = get_kafka_tick_topic(security_id)

if not latest_timestamp:
latest_timestamp = to_timestamp('2018-06-09')

start_date, end_date = evaluate_time_range(latest_timestamp)

while True:
if latest_order:
condition = {"block_time": {"$gte": start_date, "$lt": end_date},
"global_seq": {"$gt": latest_order}}
else:
condition = {"block_time": {"$gte": start_date, "$lt": end_date}}

for item in db.ram_trade.find(condition):
tick = to_tick(item)

record_meta = producer.send(topic,
bytes(json.dumps(tick, ensure_ascii=False), encoding='utf8'),
key=bytes(security_id, encoding='utf8'),
timestamp_ms=int(item['block_time'].timestamp() * 1000))
record = record_meta.get(10)

latest_timestamp = to_timestamp(record.timestamp)

latest_order = tick['order']

logger.debug("tick_to_kafka {}".format(tick))

if datetime.now() - latest_timestamp < timedelta(seconds=2):
time.sleep(1)

start_date, end_date = evaluate_time_range(latest_timestamp)


def evaluate_time_range(timestamp):
start_date = timestamp

end_date = start_date + timedelta(minutes=10)

return start_date, end_date


if __name__ == '__main__':
eos_ram_to_kafka()
11 changes: 9 additions & 2 deletions fooltrader/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,14 @@
g_http_proxy_items = []
g_socks2http_proxy_items = {}

TIME_FORMAT_MICRO = '%Y-%m-%d %H:%M:%S.%f'

TIME_FORMAT_SEC = '%Y-%m-%d %H:%M:%S'

TIME_FORMAT_DAY = '%Y-%m-%d'

# ES_HOSTS = ['172.16.92.200:9200']
ES_HOSTS = ['localhost:9200']
ES_HOSTS = ['172.16.92.200:9200']
# ES_HOSTS = ['localhost:9200']


# the action account settings
Expand All @@ -151,3 +153,8 @@
WEIXIN_APP_SECRECT = ""
if not WEIXIN_APP_SECRECT:
WEIXIN_APP_SECRECT = os.environ.get("WEIXIN_APP_SECRECT")

EOS_MONGODB_URL = ""

if not EOS_MONGODB_URL:
EOS_MONGODB_URL = os.environ.get("EOS_MONGODB_HOST")
41 changes: 41 additions & 0 deletions fooltrader/utils/kafka_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# -*- coding: utf-8 -*-
import json
import logging

from kafka import KafkaConsumer, TopicPartition

from fooltrader import KAFKA_HOST
from fooltrader.contract.kafka_contract import get_kafka_tick_topic
from fooltrader.utils.utils import to_timestamp

logger = logging.getLogger(__name__)


def get_latest_timestamp_order(security_id):
topic = get_kafka_tick_topic(security_id)
consumer = KafkaConsumer(topic,
# client_id='fooltrader',
# group_id='fooltrader',
value_deserializer=lambda m: json.loads(m.decode('utf8')),
bootstrap_servers=[KAFKA_HOST])
topic_partition = TopicPartition(topic=topic, partition=0)
end_offset = consumer.end_offsets([topic_partition])[topic_partition]
if end_offset > 0:
# partition assigned after poll, and we could seek
consumer.poll(5, 1)

consumer.seek(topic_partition, end_offset - 1)
message = consumer.poll(10000, 500)
msgs = message[topic_partition]
if len(msgs) > 0:
record = msgs[-1]
timestamp = to_timestamp(record.value['timestamp'])
order = None
if 'order' in record.value:
order = record.value['order']
return timestamp, order
return None, None


if __name__ == '__main__':
print(get_latest_timestamp_order('stock_sz_300027'))
7 changes: 5 additions & 2 deletions fooltrader/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from fooltrader.contract.data_contract import TICK_COL
from fooltrader.contract.files_contract import get_tick_path
from fooltrader.settings import TIME_FORMAT_DAY
from fooltrader.settings import TIME_FORMAT_DAY, TIME_FORMAT_MICRO

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -232,7 +232,10 @@ def to_timestamp(the_time):

def to_time_str(the_time, time_fmt=TIME_FORMAT_DAY):
try:
return to_timestamp(the_time).strftime(time_fmt)
if time_fmt == TIME_FORMAT_MICRO:
return to_timestamp(the_time).strftime(time_fmt)[0:-3]
else:
return to_timestamp(the_time).strftime(time_fmt)
except Exception as e:
return the_time

Expand Down
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@ Flask == 1.0

marshmallow

schedule
schedule

pymongo

0 comments on commit af7983c

Please sign in to comment.