Skip to content

Instantly share code, notes, and snippets.

@hxhb
Created March 29, 2024 12:00
Show Gist options
  • Save hxhb/b19c7daa3ee4685e25e7f3e847909432 to your computer and use it in GitHub Desktop.
Save hxhb/b19c7daa3ee4685e25e7f3e847909432 to your computer and use it in GitHub Desktop.
import hashlib
import re
import random
import string
import time
import requests
import os
import csv
import secrets
import string
import argparse
from datetime import datetime
memos_parser = argparse.ArgumentParser(description="markdowk parser for memos")
memos_parser.add_argument('--md',help='makrdown file',default="")
def get_arg_by_name(parser_args,ArgName):
args_pairs = parser_args.__dict__
for key,value in args_pairs.items():
if key == ArgName:
return value
global_resource_id_counter = 1 # 图片ID计数器
# 函数:将日期字符串转换为UNIX时间戳
def convert_to_timestamp(date_str):
# 定义不同的日期格式
formats = [
("%Y-%m-%d %H:%M", re.compile(r'^\d{4}-\d{2}-\d{2} \d{2}:\d{2}$')),
("%Y.%m.%d %H:%M", re.compile(r'^\d{4}\.\d{2}\.\d{2} \d{2}:\d{2}$'))
]
# 尝试匹配日期字符串并转换
for fmt, pattern in formats:
if pattern.match(date_str):
dt = datetime.strptime(date_str, fmt)
return int(time.mktime(dt.timetuple()))
raise ValueError(f"Date format for '{date_str}' is not supported.")
# 函数:生成随机的resource_name
def generate_resource_name(length=25):
alphabet = string.ascii_letters + string.digits
return ''.join(secrets.choice(alphabet) for i in range(length))
# 函数:处理图片链接,并返回剔除图片链接的内容和图片信息列表
def process_image_links(entry_content, created_ts,entry_id_counter):
image_link_pattern = re.compile(r'!\[.*?\]\((.*?)\)')
image_info_list = []
global global_resource_id_counter
# 查找所有图片链接
matches = image_link_pattern.findall(entry_content)
for match in matches:
# 获取图片文件名和链接
external_link = match
filename = os.path.basename(external_link)
# 获取图片类型
file_type = f"image/{os.path.splitext(filename)[1].lstrip('.')}"
# 尝试获取图片大小
try:
response = requests.head(external_link)
size = response.headers.get('content-length', 0)
except Exception as e:
print(f"Failed to get size for image {external_link}: {e}")
size = 0
# 构建图片信息字典
image_info = {
'id': global_resource_id_counter,
'resource_name': generate_resource_name(),
'creator_id': 1,
'created_ts': created_ts,
'updated_ts': created_ts,
'filename': filename,
'blob': '',
'external_link': external_link,
'type': file_type,
'size': size,
'internal_path': '',
'memo_id': entry_id_counter # 这里假设memo_id与图片id相同,根据实际情况调整
}
image_info_list.append(image_info)
global_resource_id_counter += 1
# 移除原内容中的图片链接
entry_content_cleaned = image_link_pattern.sub('', entry_content)
return entry_content_cleaned, image_info_list
# 更新后的 parse_markdown_to_csv 函数和其他相关函数...
# 函数:解析Markdown内容并生成CSV格式的数据
def parse_markdown_to_csv(markdown_content):
entries = markdown_content.strip().split('## ')[1:] # 分割Markdown内容
csv_data = []
resource_data = []
id_counter = 1 # ID计数器
for entry in reversed(entries):
lines = entry.split('\n')
non_empty_lines = [line for line in lines if line.strip() != ''] # 去除空行
timestamp_str = non_empty_lines[0].strip()
content = '\n'.join(non_empty_lines[1:]).strip()
timestamp = convert_to_timestamp(timestamp_str)
resource_name = generate_resource_name()
clean_conent,resource_content_data= process_image_links(content,timestamp,id_counter)
if content == "" and resource_content_data:
continue
# 生成CSV格式的行数据
csv_row = {
'id': id_counter,
'resource_name': resource_name,
'creator_id': 1,
'created_ts': timestamp,
'updated_ts': timestamp,
'row_status': 'NORMAL',
'content': clean_conent,
'visibility': 'PRIVATE'
}
csv_data.append(csv_row)
if resource_content_data:
resource_data.extend(resource_content_data)
id_counter += 1
return csv_data,resource_data
# 函数:将CSV数据写入文件
def write_csv_data(csv_data, file_path):
headers = ['id', 'resource_name', 'creator_id', 'created_ts', 'updated_ts', 'row_status', 'content', 'visibility']
with open(file_path, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=headers)
writer.writeheader()
for row in csv_data:
writer.writerow(row)
# 新函数:将图片信息写入CSV文件
def write_image_info_csv(image_info_list, file_path):
headers = ['id', 'resource_name', 'creator_id', 'created_ts', 'updated_ts', 'filename', 'blob', 'external_link', 'type', 'size', 'internal_path', 'memo_id']
with open(file_path, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=headers)
writer.writeheader()
for image_info in image_info_list:
writer.writerow(image_info)
# 主程序
def main():
parser_args = memos_parser.parse_args()
md_file = get_arg_by_name(parser_args,"md")
if md_file != "" and os.path.exists(md_file):
md_file = os.path.normpath(md_file)
md_path = os.path.dirname(md_file)
md_filename = os.path.basename(md_file)
file_base_name, file_extension = os.path.splitext(md_filename)
# 读取Markdown文件
with open(md_file, 'r', encoding='utf-8') as file:
markdown_content = file.read()
# 解析Markdown并生成CSV数据
csv_data,resource_data = parse_markdown_to_csv(markdown_content)
# 写入CSV文件
csv_file_path = os.path.join(md_path,file_base_name + '_memo.csv') # 输出CSV文件的路径
write_csv_data(csv_data, csv_file_path)
print(f'memo.csv file has been created at {csv_file_path}')
resource_csv_file_path = os.path.join(md_path,file_base_name + '_resource.csv')
write_image_info_csv(resource_data,resource_csv_file_path)
print(f'memo resource.csv file has been created at {resource_csv_file_path}')
# 运行主程序
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment