| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- import time
- from io import BytesIO
- from pathlib import Path
- import oss2
- import uuid
- try:
- from PIL import Image
- except ImportError as e:
- raise ImportError("请先安装 Pillow: pip install Pillow") from e
- OSS_ACCESS_KEY_ID = 'LTAI5tDwjfteBvivYN41r8sJ'
- OSS_ACCESS_KEY_SECRET = 'yowuOGi2nYYnrqGpO3qcz94C4brcPp'
- OSS_ENDPOINT = "oss-cn-shenzhen.aliyuncs.com"
- OSS_BUCKET_NAME = "zhijiayun-jiansuo"
- # 目标体积:约 60~100 KB(JPEG)
- TARGET_MIN_BYTES = 60 * 1024
- TARGET_MAX_BYTES = 100 * 1024
- try:
- _RESAMPLE = Image.Resampling.LANCZOS
- except AttributeError:
- _RESAMPLE = Image.LANCZOS
- def _jpeg_bytes(img, quality):
- buf = BytesIO()
- img.save(buf, format="JPEG", quality=int(quality), optimize=True)
- return buf.getvalue()
- def _prepare_rgb(img):
- if img.mode in ("RGBA", "LA"):
- background = Image.new("RGB", img.size, (255, 255, 255))
- background.paste(img, mask=img.split()[-1])
- return background
- if img.mode == "P":
- img = img.convert("RGBA")
- background = Image.new("RGB", img.size, (255, 255, 255))
- background.paste(img, mask=img.split()[-1])
- return background
- return img.convert("RGB")
- def compress_image_to_jpeg_range(
- image_source,
- min_bytes=TARGET_MIN_BYTES,
- max_bytes=TARGET_MAX_BYTES, ):
- """
- 将图片压成 JPEG,尽量使体积落在 [min_bytes, max_bytes]。
- image_source: 本地路径(str) 或 原始字节(bytes)。
- """
- if isinstance(image_source, bytes):
- img = Image.open(BytesIO(image_source))
- else:
- img = Image.open(image_source)
- base = _prepare_rgb(img)
- scale = 1.0
- while scale >= 0.05:
- if scale < 1.0:
- w, h = base.size
- cur = base.resize(
- (max(1, int(w * scale)), max(1, int(h * scale))),
- _RESAMPLE,
- )
- else:
- cur = base
- lo, hi = 1, 95
- while lo <= hi:
- mid = (lo + hi) // 2
- data = _jpeg_bytes(cur, mid)
- n = len(data)
- if min_bytes <= n <= max_bytes:
- return data
- if n > max_bytes:
- hi = mid - 1
- else:
- lo = mid + 1
- n_min_q = len(_jpeg_bytes(cur, 1))
- if n_min_q > max_bytes:
- scale *= 0.82
- continue
- n_max_q = len(_jpeg_bytes(cur, 95))
- if n_max_q < min_bytes:
- return _jpeg_bytes(cur, 95)
- best = None
- best_dist = None
- target = (min_bytes + max_bytes) // 2
- for q in range(1, 96):
- data = _jpeg_bytes(cur, q)
- n = len(data)
- if min_bytes <= n <= max_bytes:
- return data
- dist = abs(n - target)
- if best_dist is None or dist < best_dist:
- best_dist = dist
- best = data
- return best
- return _jpeg_bytes(base, 85)
- class AliyunOSSUploader:
- """阿里云OSS上传工具类"""
- def __init__(self):
- self.auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET)
- self.bucket = oss2.Bucket(self.auth, f"https://{OSS_ENDPOINT}", OSS_BUCKET_NAME)
- self.bucket_name = OSS_BUCKET_NAME
- self.endpoint = OSS_ENDPOINT
- def upload_image(self, local_file_path, content_type='image/jpeg'):
- filename = Path(local_file_path).name
- object_name = f"screenshots/{str(time.strftime('%Y%m%d_%H%M%S'))}_{filename}"
- if not object_name.lower().endswith(('.jpg', '.jpeg')):
- object_name = object_name.rsplit('.', 1)[0] + '.jpg'
- compressed = compress_image_to_jpeg_range(local_file_path)
- headers = {'Content-Type': 'image/jpeg'}
- print(object_name)
- self.bucket.put_object(object_name, compressed, headers=headers)
- return f"https://{self.bucket_name}.{self.endpoint}/{object_name}"
- def upload_from_bytes(self, image_data, filename, content_type='image/jpeg'):
- filename = f"screenshots/{time.strftime('%Y%m%d_%H%M%S')}_{filename}.jpg"
- compressed = compress_image_to_jpeg_range(image_data)
- headers = {'Content-Type': 'image/jpeg'}
- self.bucket.put_object(filename, compressed, headers=headers)
- return f"https://{self.bucket_name}.{self.endpoint}/{filename}"
- # 使用示例
- if __name__ == "__main__":
- uploader = AliyunOSSUploader()
- # url = uploader.upload_image("screenshots/68906391.jpg", content_type='image/jpeg')
- # print(f"上传成功:{url}")
- image_data = open("screenshots/68906391.jpg", "rb")
- url = uploader.upload_image(image_data, "68906391",content_type='image/jpeg')
|