import time from io import BytesIO from pathlib import Path import oss2 import uuid try: from PIL import Image except ImportError as e: raise ImportError("请先安装 Pillow: pip install Pillow") from e OSS_ACCESS_KEY_ID = 'LTAI5tDwjfteBvivYN41r8sJ' OSS_ACCESS_KEY_SECRET = 'yowuOGi2nYYnrqGpO3qcz94C4brcPp' OSS_ENDPOINT = "oss-cn-shenzhen.aliyuncs.com" OSS_BUCKET_NAME = "zhijiayun-jiansuo" # 目标体积:约 60~100 KB(JPEG) TARGET_MIN_BYTES = 60 * 1024 TARGET_MAX_BYTES = 100 * 1024 try: _RESAMPLE = Image.Resampling.LANCZOS except AttributeError: _RESAMPLE = Image.LANCZOS def _jpeg_bytes(img, quality): buf = BytesIO() img.save(buf, format="JPEG", quality=int(quality), optimize=True) return buf.getvalue() def _prepare_rgb(img): if img.mode in ("RGBA", "LA"): background = Image.new("RGB", img.size, (255, 255, 255)) background.paste(img, mask=img.split()[-1]) return background if img.mode == "P": img = img.convert("RGBA") background = Image.new("RGB", img.size, (255, 255, 255)) background.paste(img, mask=img.split()[-1]) return background return img.convert("RGB") def compress_image_to_jpeg_range( image_source, min_bytes=TARGET_MIN_BYTES, max_bytes=TARGET_MAX_BYTES, ): """ 将图片压成 JPEG,尽量使体积落在 [min_bytes, max_bytes]。 image_source: 本地路径(str) 或 原始字节(bytes)。 """ if isinstance(image_source, bytes): img = Image.open(BytesIO(image_source)) else: img = Image.open(image_source) base = _prepare_rgb(img) scale = 1.0 while scale >= 0.05: if scale < 1.0: w, h = base.size cur = base.resize( (max(1, int(w * scale)), max(1, int(h * scale))), _RESAMPLE, ) else: cur = base lo, hi = 1, 95 while lo <= hi: mid = (lo + hi) // 2 data = _jpeg_bytes(cur, mid) n = len(data) if min_bytes <= n <= max_bytes: return data if n > max_bytes: hi = mid - 1 else: lo = mid + 1 n_min_q = len(_jpeg_bytes(cur, 1)) if n_min_q > max_bytes: scale *= 0.82 continue n_max_q = len(_jpeg_bytes(cur, 95)) if n_max_q < min_bytes: return _jpeg_bytes(cur, 95) best = None best_dist = None target = (min_bytes + max_bytes) // 2 for q in range(1, 96): data = _jpeg_bytes(cur, q) n = len(data) if min_bytes <= n <= max_bytes: return data dist = abs(n - target) if best_dist is None or dist < best_dist: best_dist = dist best = data return best return _jpeg_bytes(base, 85) class AliyunOSSUploader: """阿里云OSS上传工具类""" def __init__(self): self.auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET) self.bucket = oss2.Bucket(self.auth, f"https://{OSS_ENDPOINT}", OSS_BUCKET_NAME) self.bucket_name = OSS_BUCKET_NAME self.endpoint = OSS_ENDPOINT def upload_image(self, local_file_path, content_type='image/jpeg'): filename = Path(local_file_path).name object_name = f"screenshots/{str(time.strftime('%Y%m%d_%H%M%S'))}_{filename}" if not object_name.lower().endswith(('.jpg', '.jpeg')): object_name = object_name.rsplit('.', 1)[0] + '.jpg' compressed = compress_image_to_jpeg_range(local_file_path) headers = {'Content-Type': 'image/jpeg'} print(object_name) self.bucket.put_object(object_name, compressed, headers=headers) return f"https://{self.bucket_name}.{self.endpoint}/{object_name}" def upload_from_bytes(self, image_data, filename, content_type='image/jpeg'): filename = f"screenshots/{time.strftime('%Y%m%d_%H%M%S')}_{filename}.jpg" compressed = compress_image_to_jpeg_range(image_data) headers = {'Content-Type': 'image/jpeg'} self.bucket.put_object(filename, compressed, headers=headers) return f"https://{self.bucket_name}.{self.endpoint}/{filename}" # 使用示例 if __name__ == "__main__": uploader = AliyunOSSUploader() # url = uploader.upload_image("screenshots/68906391.jpg", content_type='image/jpeg') # print(f"上传成功:{url}") image_data = open("screenshots/68906391.jpg", "rb") url = uploader.upload_image(image_data, "68906391",content_type='image/jpeg')