mirror of
https://github.com/apachecn/epub-crawler.git
synced 2025-06-05 16:54:06 +00:00
148 lines
3.8 KiB
Python
148 lines
3.8 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import requests
|
|
from imgyaso import pngquant_bts, \
|
|
adathres_bts, grid_bts, noise_bts, trunc_bts
|
|
import re
|
|
import os
|
|
import shutil
|
|
import tempfile
|
|
import sys
|
|
from os import path
|
|
import uuid
|
|
import tempfile
|
|
import json
|
|
|
|
RE_DATA_URL = r'^data:image/\w+;base64,'
|
|
|
|
bundle_dir = tempfile.gettempdir()
|
|
cache_dir = 'epubcralwer'
|
|
|
|
is_pic = lambda x: x.endswith('.jpg') or \
|
|
x.endswith('.jpeg') or \
|
|
x.endswith('.png') or \
|
|
x.endswith('.gif') or \
|
|
x.endswith('.tiff') or \
|
|
x.endswith('.bmp') or \
|
|
x.endswith('.webp')
|
|
|
|
def safe_mkdir(dir):
|
|
try: os.mkdir(dir)
|
|
except: pass
|
|
|
|
def safe_rmdir(dir):
|
|
try: shutil.rmtree(dir)
|
|
except: pass
|
|
|
|
def request_retry(method, url, retry=10, check_status=False, **kw):
|
|
kw.setdefault('timeout', 10)
|
|
for i in range(retry):
|
|
try:
|
|
r = requests.request(method, url, **kw)
|
|
if check_status: r.raise_for_status()
|
|
return r
|
|
except KeyboardInterrupt as e:
|
|
raise e
|
|
except Exception as e:
|
|
print(f'{url} retry {i}')
|
|
if i == retry - 1: raise e
|
|
|
|
def opti_img(img, mode, colors):
|
|
if mode == 'quant':
|
|
return pngquant_bts(img, colors)
|
|
elif mode == 'grid':
|
|
return grid_bts(img)
|
|
elif mode == 'trunc':
|
|
return trunc_bts(img, colors)
|
|
elif mode == 'thres':
|
|
return adathres_bts(img)
|
|
else:
|
|
return img
|
|
|
|
def safe_remove(name):
|
|
try: os.remove(name)
|
|
except: pass
|
|
|
|
def load_module(fname):
|
|
if not path.isfile(fname) or \
|
|
not fname.endswith('.py'):
|
|
raise FileNotFoundError('外部模块应是 *.py 文件')
|
|
tmpdir = path.join(tempfile.gettempdir(), 'load_module')
|
|
safe_mkdir(tmpdir)
|
|
if tmpdir not in sys.path:
|
|
sys.path.insert(0, tmpdir)
|
|
mod_name = 'x' + uuid.uuid4().hex
|
|
nfname = path.join(tmpdir, mod_name + '.py')
|
|
shutil.copy(fname, nfname)
|
|
mod = __import__(mod_name)
|
|
safe_remove(nfname)
|
|
return mod
|
|
|
|
def load_article(hash):
|
|
fname = path.join(bundle_dir, cache_dir, f'{hash}.json')
|
|
if not path.isfile(fname):
|
|
return None
|
|
try:
|
|
art = json.loads(open(fname, encoding='utf8').read())
|
|
except Exception as ex:
|
|
print(ex)
|
|
return None
|
|
|
|
if isinstance(art, dict) and \
|
|
art.get('title', '') and \
|
|
art.get('content', ''):
|
|
return art
|
|
else:
|
|
return None
|
|
|
|
def save_article(hash, art):
|
|
dir = path.join(bundle_dir, cache_dir)
|
|
safe_mkdir(dir)
|
|
fname = path.join(dir, f'{hash}.json')
|
|
open(fname, 'w', encoding='utf-8').write(json.dumps(art))
|
|
|
|
def load_img(hash, opti):
|
|
fname = path.join(bundle_dir, cache_dir, f'{hash}-{opti}.png')
|
|
if not path.isfile(fname):
|
|
return None
|
|
img = open(fname, 'rb').read()
|
|
if img != b'':
|
|
return img
|
|
else:
|
|
return None
|
|
|
|
def save_img(hash, opti, img):
|
|
dir = path.join(bundle_dir, cache_dir)
|
|
safe_mkdir(dir)
|
|
fname = path.join(dir, f'{hash}-{opti}.png')
|
|
open(fname, 'wb').write(img)
|
|
|
|
def size_str_to_int(s):
|
|
factor_map = {
|
|
'' : 1,
|
|
'k': 1 << 10,
|
|
'm': 1 << 20,
|
|
'g': 1 << 30,
|
|
't': 1 << 40,
|
|
'p': 1 << 50,
|
|
'e': 1 << 60,
|
|
'z': 1 << 60,
|
|
'y': 1 << 70,
|
|
'b': 1 << 80,
|
|
'n': 1 << 90,
|
|
'd': 1 << 100,
|
|
'c': 1 << 110,
|
|
'x': 1 << 120,
|
|
}
|
|
suf = ''.join(factor_map.keys())
|
|
m = re.search(r'^(\d+(?:\.\d+)?)([' + suf + r']?)$', s.lower())
|
|
if not m: return -1
|
|
base = float(m.group(1))
|
|
|
|
factor = factor_map[m.group(2)]
|
|
return int(base * factor)
|
|
|
|
def extname(fname):
|
|
m = re.search(r'\.(\w+)$', fname)
|
|
return m.group(1) if m else ''
|