-
Notifications
You must be signed in to change notification settings - Fork 3
/
main.py
106 lines (93 loc) · 3.94 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import json
import typer
from pathlib import Path
import requests
from tqdm import tqdm
import xml.etree.ElementTree as ET
from typing_extensions import Annotated
import sqlite3
app = typer.Typer()
def get_safe_path(s: str) -> str:
"""
Remove invalid characters to sanitize a path.
:param s: str to sanitize
:returns: sanitized str
"""
ban_chars = "\\ / : * ? \" ' < > | $ \r \n".replace(
' ', '')
for i in ban_chars:
s = s.replace(i, "")
return s
def process_history(history: str):
if history.startswith("<?xml") or history.startswith("<msg>"):
try:
root = ET.fromstring(history)
title = root.find('.//title').text if root.find('.//title') is not None else None
quoted = root.find('.//refermsg/content').text if root.find('.//refermsg/content') is not None else None
if title and quoted:
return {
"title": title,
"quoted": process_history(quoted)
}
if title:
return title
except Exception:
return history
return history
def get_message(history: dict | str):
if isinstance(history, dict):
if 'title' in history:
return history['title']
else:
return history
def export_chathistory(user_id: str):
res = requests.get("http://localhost:48065/wechat/chatlog", params={
"userId": user_id,
"count": 100000
}).json()
for i in range(len(res['chatLogs'])):
res['chatLogs'][i]['content'] = process_history(res['chatLogs'][i]['content'])
res['chatLogs'][i]['message'] = get_message(res['chatLogs'][i]['content'])
return res['chatLogs']
@app.command()
def export_all(dest: Annotated[Path, typer.Argument(help="Destination path to export to.")]):
"""
Export all users' chat history to json files.
"""
if not dest.is_dir():
if not dest.exists():
inp = typer.prompt("Destination path does not exist, create it? (y/n)")
if inp.lower() == 'y':
dest.mkdir(parents=True)
else:
typer.echo("Aborted.", err=True)
return
else:
typer.echo("Destination path is not a directory!", err=True)
return
all_users = requests.get("http://localhost:48065/wechat/allcontacts").json()
for user in tqdm(all_users):
usr_chatlog = export_chathistory(user['arg'])
out_path = dest/get_safe_path((user['title'] or "")+"-"+user['arg']+'.json')
with open(out_path, 'w', encoding='utf-8') as f:
json.dump(usr_chatlog, f)
print(f"Exported {len(all_users)} users' chat history to {dest} in json.")
@app.command()
def export_sqlite(dest: Annotated[Path, typer.Argument(help="Destination path to export to.")] = Path("chatlog.db")):
"""
Export all users' chat history to a sqlite database.
"""
connection = sqlite3.connect(dest)
cursor = connection.cursor()
cursor.execute("CREATE TABLE IF NOT EXISTS chatlog (id INTEGER PRIMARY KEY AUTOINCREMENT, with_id TEXT, from_user TEXT, to_user TEXT, message TEXT, timest DATETIME, auxiliary TEXT)")
cursor.execute("CREATE INDEX IF NOT EXISTS chatlog_with_id_index ON chatlog (with_id)")
cursor.execute("CREATE TABLE iF NOT EXISTS users (id TEXT PRIMARY KEY, name TEXT)")
all_users = requests.get("http://localhost:48065/wechat/allcontacts").json()
for user in tqdm(all_users):
cursor.execute("INSERT OR IGNORE INTO users (id, name) VALUES (?, ?)", (user['arg'], user['title']))
usr_chatlog = export_chathistory(user['arg'])
for msg in usr_chatlog:
cursor.execute("INSERT INTO chatlog (with_id, from_user, to_user, message, timest, auxiliary) VALUES (?, ?, ?, ?, ?, ?)", (user['arg'], msg['fromUser'], msg['toUser'], msg['message'], msg['createTime'], str(msg['content'])))
connection.commit()
if __name__ == "__main__":
app()