Skip to content

Instantly share code, notes, and snippets.

@bangyuwen
Created February 11, 2024 03:45
Show Gist options
  • Save bangyuwen/692c1e4cacc8a84b310a152742a5b4b6 to your computer and use it in GitHub Desktop.
Save bangyuwen/692c1e4cacc8a84b310a152742a5b4b6 to your computer and use it in GitHub Desktop.
import json
import shutil
import time
from datetime import datetime
from pathlib import Path
from zoneinfo import ZoneInfo
import fire
import pandas as pd
import requests
from loguru import logger
from requests.adapters import HTTPAdapter, Retry
cache_folder = Path(__file__).resolve().parent.joinpath(".cache")
logger.info(f"Cache folder: {cache_folder}")
logger.add(
cache_folder.joinpath("etf_crawler.log"),
rotation="1 day",
retention="7 days",
)
etf_status_ndjson = cache_folder.joinpath("etf_status.ndjson")
etf_stocks_ndjson = cache_folder.joinpath("etf_stocks.ndjson")
def get_payload(date_tw: datetime) -> dict | None:
session = requests.Session()
retries = Retry(
total=5,
backoff_factor=3,
status_forcelist=[502, 503, 504],
allowed_methods=None,
)
session.mount("https://", HTTPAdapter(max_retries=retries))
utc_datetime = date_tw.astimezone(ZoneInfo("UTC")).strftime(
"%Y-%m-%dT%H:%M:%S.000Z",
)
res = session.post(
********,
headers={"content-type": "application/json"},
data=********,
)
try:
res.raise_for_status()
return res.json()
except Exception as e:
logger.error(
f"post error, date_tw: {date_tw}, status code: {res.status_code}, error: {e}", # noqa: E501
)
return None
def parse_and_write(payload: dict):
if payload["code"] != 200:
logger.error(f"status code error, payload: {payload}")
return
try:
with etf_status_ndjson.open("a") as fp:
etf_status = json.dumps(payload["data"]["status"], ensure_ascii=False)
fp.write(etf_status + "\n")
with etf_stocks_ndjson.open("a") as fp:
etf_stocks = "\n".join(
json.dumps(stock, ensure_ascii=False)
for stock in payload["data"]["stocks"]
)
fp.write(etf_stocks + "\n")
except Exception as e:
logger.exception(f"{e}, payload: {payload}")
def uniq_files():
for file in [etf_status_ndjson, etf_stocks_ndjson]:
uniq_lines = set(file.read_text().splitlines())
file.write_text("\n".join(uniq_lines))
def main(
no_cache: bool = False,
start_date: str = "2022-10-20",
end_date: str = datetime.now(ZoneInfo("Asia/Taipei")).strftime("%Y-%m-%d"),
):
if no_cache:
shutil.rmtree(cache_folder, ignore_errors=True)
cache_folder.mkdir()
logger.info("clear cache folder")
dates_tw = pd.date_range(start_date, end_date, freq="B").to_list()
for date_tw in dates_tw:
if payload := get_payload(date_tw):
parse_and_write(payload)
logger.info(f"crawled date: {date_tw}")
time.sleep(2)
uniq_files()
if __name__ == "__main__":
fire.Fire(main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment