Skip to content

Instantly share code, notes, and snippets.

@xurenlu
Created February 28, 2024 06:45
Show Gist options
  • Save xurenlu/214cb6f4632ffbb48f918c9a91c7c1da to your computer and use it in GitHub Desktop.
Save xurenlu/214cb6f4632ffbb48f918c9a91c7c1da to your computer and use it in GitHub Desktop.
grap image information in fc@aliyun
# -*- coding: utf-8 -*-
import oss2
import json
from wand.image import Image
from wand.drawing import Drawing
from wand.color import Color
import base64
import requests
def oss_get_image_data(context, path):
parts = path.split("/")
bucket = parts[0]
object = "/".join(parts[1:])
creds = context.credentials
auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
oss_client = oss2.Bucket(
auth, "oss-" + context.region + "-internal.aliyuncs.com", bucket
)
return oss_client.get_object(object)
def oss_save_image_data(context, img_blob, path):
parts = path.split("/")
bucket = parts[0]
object = "/".join(parts[1:])
creds = context.credentials
auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
oss_client = oss2.Bucket(
auth, "oss-" + context.region + "-internal.aliyuncs.com", bucket
)
print(bucket, object)
oss_client.put_object(object, img_blob)
def pinjie(query, context):
try:
image1 = query.get("left")
image2 = query.get("right")
assert image1 and image2
fmt = query.get("fmt", "jpg")
print("pinjie images: ", image1, image2)
with Image(file=oss_get_image_data(context, image1)) as f1:
with Image(file=oss_get_image_data(context, image2)) as f2:
with Image(
width=f1.width + f2.width + 10, height=max(f1.height, f2.height)
) as img:
img.format = fmt
img.composite(f1, left=0, top=0)
img.composite(f2, left=f1.width + 10, top=0)
return make_response(context, query, img.make_blob(), fmt)
except Exception as ex:
return make_error_response(ex)
# 获取图片信息
def get_image_info_from_url(query,context):
url = query.get("url")
# 使用requests获取图片内容
response = requests.get(url)
response.raise_for_status() # 确保请求成功
result = {}
# 使用wand.image.Image读取图片数据
with Image(blob=response.content) as img:
result["width"] = img.width
result["height"] = img.height
result["format"] = img.format
# 检查图片是否包含透明度。如果图片的透明度通道存在,则认为图片是透明的
result["transparency"] = 'transparency' in img.channel_images
return result
def watermark(query, context):
try:
image = query.get("img")
text = query.get("text")
assert image and text
fmt = query.get("fmt", "jpg")
print("watermark image: {}, text: {}".format(image, text))
with Drawing() as draw:
draw.font = "/usr/share/fonts/truetype/wqy/wqy-microhei.ttc"
draw.fill_color = Color("red")
draw.font_size = 24
with Image(width=200, height=50, background=Color("transparent")) as pic:
draw.text(10, int(pic.height / 2), text)
draw(pic)
pic.alpha = True
pic.format = "png"
pic.save(filename="/tmp/water.png")
with Image(file=oss_get_image_data(context, image)) as img:
with Image(filename="/tmp/water.png") as water:
with img.clone() as base:
base.format = fmt
base.watermark(water, 0.3, 0, int(img.height - 30))
return make_response(context, query, base.make_blob(), fmt)
except Exception as ex:
return make_error_response(ex)
def format(query, context):
try:
image = query.get("img")
fmt = query.get("fmt")
assert image and fmt
print("format image: {}, fmt: {}".format(image, fmt))
with Image(file=oss_get_image_data(context, image)) as img:
img.format = fmt
return make_response(context, query, img.make_blob(), fmt)
except Exception as ex:
return make_error_response(ex)
def gray(query, context):
try:
img_path = query.get("img")
parts = img_path.split("/")
object = "/".join(parts[1:])
img_type = object.split(".")[-1]
if (
img_type != "jpg"
and img_type != "jpeg"
and img_type != "webp"
and img_type != "png"
):
img_type = "jpeg"
with Image(file=oss_get_image_data(context, img_path)) as img:
img.format = img_type
img.transform_colorspace("gray")
return make_response(context, query, img.make_blob(), img_type)
except Exception as ex:
return make_error_response(ex)
def make_error_response(ex):
return {
"body": "ERROR: {}".format(str(ex)),
"headers": {"content-type": "text/plain"},
"statusCode": 500,
}
def make_response(context, query, img_blob, img_type):
target = query.get("target")
if target is not None and len(target) > 0:
oss_save_image_data(context, img_blob, target)
base64Data = base64.b64encode(img_blob).decode()
dataUrl = "data:image/{};base64,{}".format(img_type, base64Data)
html = """
<div style="text-align: center;">
<img src="{}" alt="Base64 Image" style="display: inline-block;">
</div>
""".format(
dataUrl
)
return {
"body": html,
"headers": {"content-type": "text/html"},
"statusCode": 200,
}
def handler(event, context):
evt = json.loads(event)
print(evt)
path = evt["requestContext"]["http"]["path"]
method = evt["requestContext"]["http"]["method"]
query = evt.get("queryParameters", {})
if method != "GET":
return {
"body": "only support GET method\n",
"statusCode": 200,
}
if path == "/":
return {
"body": "/info 获取图片信息\n/gray 获取图片灰度\n/format 格式转换\n/watermark 添加水印\n/pinjie 拼接图片\n",
"headers": {"content-type": "text/plain"},
"statusCode": 200,
}
if path=="/info":
return get_image_info_from_url(query,context)
if path == "/gray":
return gray(query, context)
elif path == "/format":
return format(query, context)
elif path == "/watermark":
return watermark(query, context)
elif path == "/pinjie":
return pinjie(query, context)
else:
with open("index.html") as f:
return {
"body": f.read(),
"headers": {"content-type": "text/html"},
"statusCode": 200,
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment