SPARKCREATIVE Tech Blog

https://www.spark-creative.jp/

Python gif 作ってみた

こんにちは!!!クライアントエンジニアの小林です。

今回はPythonでGIFを作っていきます。

作業環境

windows 10
visual studio code
python 3.10

概要

はてなブログにGIFを投下するには10.0 MBに抑えないといけません。

筆者はAdobeが無料で提供しているAdobe ExpressというツールでGIFを作成しています。

無料で圧縮率もそこそこ高くてとっても優秀なのですが、その圧縮率が故にディザノイズが割と出やすいという代償があります。

見るに堪えないほどの品質ではなかったので、いつか自分でGIF変換作ろう程度の優先度から早数か月、今に至りました。

ライブラリの機能を組み合わせてtkinterで簡易的にツール化していきます。

GIF作成

GIFの出力にはpillowを使用します。

imageioは引数指定のフォーマットが変なキーワードで好みじゃないのと、内部で余計な変換処理をしていそうな雰囲気があり、それを把握するのが面倒だったので、pillowの方を採用しました。

シンプルなサンプル

とりあえず動けばいいやというものです。

import cv2
import numpy as np
from PIL import Image
from typing import Union, Optional
from pathlib import Path


def image_quantize(
    image:np.ndarray,
    colors:int=256,
    method:int=Image.Quantize.MEDIANCUT,
    dither:int=Image.Dither.NONE,
    mode:str="RGB",
) -> Image.Image:
    """画像の量子化

    Args:
        image (np.ndarray): 入力画像(RGB配置を想定)
        colors (int, optional): 減色後の色数. Defaults to 256.
        method (int, optional): 量子化の種類. Defaults to Image.Quantize.MEDIANCUT.
        dither (int, optional): ディザの種類. Defaults to Image.Dither.NONE.
        mode (str, optional): 出力結果の形式. Defaults to "RGB".

    Returns:
        Image.Image: 量子化された画像
    """
    image:Image.Image = Image.fromarray(image, mode=mode)
    image = image.quantize(colors=colors, method=method, dither=dither)
    return image.convert(mode)


class WithVideoCapture:
    """with対応なcv2.VideoCapture
    """
    def __init__(self, filename:str) -> None:
        """コンストラクタ

        Args:
            filename (str): 動画のファイルパス
        """
        self.cap = cv2.VideoCapture(filename)

    def __enter__(self) -> "WithVideoCapture":
        # NOTE: 一部プロパティはreadし終えてから読み込むと0.0になります。
        self.__width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        self.__height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        self.__fps = self.cap.get(cv2.CAP_PROP_FPS)
        self.__frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
        self.__frame = -1
        return self

    def __exit__(self, exc_type, exc_value, traceback) -> None:
        self.cap.release()

    @property
    def width(self) -> int:
        """画像の横幅を取得

        Returns:
            int: 画像の横幅
        """
        return self.__width

    @property
    def height(self) -> int:
        """画像の縦幅を取得

        Returns:
            int: 画像の縦幅
        """
        return self.__height

    @property
    def fps(self) -> float:
        """フレームレートを取得

        Returns:
            float: フレームレート
        """
        return self.__fps

    @property
    def frames(self) -> int:
        """総フレーム数を取得

        Returns:
            int: 総フレーム数
        """
        return self.__frames

    @property
    def retval(self) -> bool:
        """最後のread結果を取得

        Returns:
            bool: 読込に成功した場合はTrueを返します。
        """
        try:
            return self.__retval
        except Exception:
            return False

    @property
    def image(self) -> Optional[np.ndarray]:
        """最後のreadで読み込んだ画像を取得

        Returns:
            Optional[np.ndarray]: 読み込みに成功した場合は画像データを返します。
        """
        try:
            return self.__image
        except Exception:
            return None

    @property
    def frame(self) -> int:
        """現在のフレーム数を取得

        readするたびに進みます。

        Returns:
            int: 現在のフレーム数、又は一度もreadしていない場合は-1を返します。
        """
        try:
            return self.__frame
        except Exception:
            return -1

    def read(self) -> bool:
        """読込

        Returns:
            bool: 読込結果
        """
        self.__retval, self.__image = self.cap.read()
        self.__frame += 1
        return self.retval


def simple_sample(filename:Union[str, Path]) -> None:
    """サンプル

    Args:
        filename (Union[str, Path]): 動画の入力パス
    """
    if isinstance(filename, str):
        filename:Path = Path(filename)

    # *.mp4以外の入力をする予定がない
    if filename.suffix != ".mp4":
        return None

    # read errorはwithで無視されちゃうのでここで止めておく
    if not filename.is_file():
        return None

    input_path = filename
    output_path = filename.with_suffix(".gif")

    with WithVideoCapture(str(input_path)) as cap:
        images:list[Image.Image] = []

        # apply scale.
        width = cap.width - cap.width//2
        height = cap.height - cap.height//2

        while cap.read():
            image = cap.image
            image = cv2.resize(image, (width, height), interpolation=cv2.INTER_AREA)
            image = cv2.cvtColor(image, cv2.COLOR_BGRA2RGB)
            image = image_quantize(image, method=Image.Quantize.MEDIANCUT)
            images.append(image)

    # 読込欠損発生
    if len(images) != cap.frames:
        return None

    images[0].save(str(output_path), save_all=True, append_images=images[1:], optimize=False, duration=1.0 / cap.fps * 1000.0, loop=0)


if __name__ == "__main__":
    simple_sample(r"F:\GIFConverter\sample\41.mp4")

圧縮率の比較

結果は変換元で使用されている色範囲に依存するため、参考程度な比較表です。

品質を落としてまで圧縮する予定はないので色範囲は256色固定、ディザは適用しても見た目が変わらなかったのでNONEを選択して、量子化の方法と画像のスケールのみ差分を出しています。

八分木が最強っぽい気がしますが、階調がはっきりしてしまうため、GIF変換する動画次第では向いていないかもですね。そして画像サイズを小さくするのが結局強いという。

method scale file size
Adobe Express 100% (1024×576) 6,623 KB
MEDIANCUT 100% (1024×576) 11,532 KB
FASTOCTREE 100% (1024×576) 6,740 KB
MEDIANCUT 75% (768×432) 7,249 KB
FASTOCTREE 75% (768×432) 4,190 KB
MEDIANCUT 50% (512×288) 3,775 KB
FASTOCTREE 50% (512×288) 2,219 KB
Adobe Express (100%)
Median cut (75%)
Fast octree (75%)

画像の量子化とGIF出力はスレッド化

処理に時間がかかるものはスレッド化します。

opencvやpillowの高速化には、使用する関数や開発環境にも依存しますが、基本的にはスレッド化よりもプロセス化の方が効果的です。本来であればプロセス化で進めたかったのですが、tkinterでざっくり組み込んでいたらちょっと不都合と衝突したため、妥協してスレッド化にしています。

from pathlib import Path
import multiprocessing as mp
import threading as th
from typing import Union, Optional, Callable, Any
import cv2
import numpy as np
from PIL import Image
from dataclasses import dataclass


__all__ = [
    "WithVideoCapture",
    "GIFConverter",
]


class WithVideoCapture:
    """with対応なcv2.VideoCapture
    """
    def __init__(self, filename:str) -> None:
        """コンストラクタ

        Args:
            filename (str): 動画のファイルパス
        """
        self.cap = cv2.VideoCapture(filename)

    def __enter__(self) -> "WithVideoCapture":
        # NOTE: 一部プロパティはreadし終えてから読み込むと0.0になります。
        self.__width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        self.__height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        self.__fps = self.cap.get(cv2.CAP_PROP_FPS)
        self.__frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
        self.__frame = -1
        return self

    def __exit__(self, exc_type, exc_value, traceback) -> None:
        self.cap.release()

    @property
    def width(self) -> int:
        """画像の横幅を取得

        Returns:
            int: 画像の横幅
        """
        return self.__width

    @property
    def height(self) -> int:
        """画像の縦幅を取得

        Returns:
            int: 画像の縦幅
        """
        return self.__height

    @property
    def fps(self) -> float:
        """フレームレートを取得

        Returns:
            float: フレームレート
        """
        return self.__fps

    @property
    def frames(self) -> int:
        """総フレーム数を取得

        Returns:
            int: 総フレーム数
        """
        return self.__frames

    @property
    def retval(self) -> bool:
        """最後のread結果を取得

        Returns:
            bool: 読込に成功した場合はTrueを返します。
        """
        try:
            return self.__retval
        except Exception:
            return False

    @property
    def image(self) -> Optional[np.ndarray]:
        """最後のreadで読み込んだ画像を取得

        Returns:
            Optional[np.ndarray]: 読み込みに成功した場合は画像データを返します。
        """
        try:
            return self.__image
        except Exception:
            return None

    @property
    def frame(self) -> int:
        """現在のフレーム数を取得

        readするたびに進みます。

        Returns:
            int: 現在のフレーム数、又は一度もreadしていない場合は-1を返します。
        """
        try:
            return self.__frame
        except Exception:
            return -1

    def read(self) -> bool:
        """読込

        Returns:
            bool: 読込結果
        """
        self.__retval, self.__image = self.cap.read()
        self.__frame += 1
        return self.retval


@dataclass
class GIFExportInfo:
    """GIF変換、出力情報
    """
    input_path:str
    output_path:str
    resize:float
    quantize_method:int
    play_speed:float
    num_workers:int

    def __post_init__(self) -> None:
        if isinstance(self.input_path, Path):
            self.input_path = str(self.input_path)

        if isinstance(self.output_path, Path):
            self.output_path = str(self.output_path)

        self.num_workers = max(1, self.num_workers)


class GIFConverter:
    """GIF変換と出力
    """
    # GIF変換可能な拡張子
    SUPPORT_SUFFIXES = tuple([".mp4", ".avi"])

    def __init__(self) -> None:
        """コンストラクタ
        """
        # GIF変換と出力を行うスレッド
        self.thread:th.Thread = None

    @staticmethod
    def is_valid_path(in_path:Any, is_file:bool, suffix:Optional[Union[str, tuple[str, ...]]]) -> bool:
        """パスの有効性チェック

        Args:
            in_path (Any): チェックするパス
            is_file (bool): ファイルが存在するか確認します。
            suffix (Optional[Union[str, tuple[str, ...]]]): 拡張子を限定する場合に指定します。

        Returns:
            bool: _description_
        """
        if isinstance(in_path, str) and in_path != "":
            in_path:Path = Path(in_path)
        elif not isinstance(in_path, Path):
            return False

        # ファイルの有無が指定されている場合.
        if is_file and not in_path.is_file():
            return False

        # 拡張子が指定されている場合は含まれるか.
        if suffix is not None and in_path.suffix not in suffix:
            return False

        return True

    def is_thread_ready(self) -> bool:
        """スレッドの立ち上げ準備が整っているかを取得します。

        Returns:
            bool: スレッドを立ち上げられる場合はTrueを返します。
        """
        if self.thread is None:
            return True

        # スレッドが生きている場合は準備完了していません。
        return not self.thread.is_alive()

    def export(
        self,
        input_path:Union[Path, str],
        output_path:Union[Path, str],
        resize:float,
        quantize_method:int,
        play_speed:float,
        num_workers:int,
        quantized_callback:Optional[Callable[[list[Image.Image], float], None]] = None,
        exported_callback:Optional[Callable[[bool], None]] = None,
    ) -> bool:
        """[MainThread] GIF変換と出力

        Args:
            input_path (Union[Path, str]): 動画の入力パス
            output_path (Union[Path, str]): GIFの出力パス
            resize (float): リサイズ
            quantize_method (int): 量子化の種類
            play_speed (float): 再生速度
            num_workers (int): 量子化処理のワーカー数
            quantized_callback (Optional[Callable[[list[Image.Image], float], None]], optional): 量子化後のコールバック. Defaults to None.
            exported_callback (Optional[Callable[[bool], None]], optional): GIF出力後のコールバック. Defaults to None.

        Returns:
            bool: スレッドの立ち上げに成功した場合はTrueを返します。
        """
        # 最後に実行したGIF変換が完了しているか
        if not self.is_thread_ready():
            return False

        # 入力先の有効性を確認
        if not GIFConverter.is_valid_path(input_path, True, self.SUPPORT_SUFFIXES):
            return False

        # 出力先の有効性を確認
        if not GIFConverter.is_valid_path(output_path, False, ".gif"):
            return False

        # GIF変換スレッドの立ち上げ
        self.thread = th.Thread(
            target=self.thread_export,
            args=(
                GIFExportInfo(
                    input_path,
                    output_path,
                    resize,
                    quantize_method,
                    play_speed,
                    num_workers,
                ),
                quantized_callback,
                exported_callback,
            ),
            daemon=True,
        )
        self.thread.start()

        return True

    def thread_export(
        self,
        info:GIFExportInfo,
        quantized_callback:Optional[Callable[[list[Image.Image], float], None]] = None,
        exported_callback:Optional[Callable[[bool], None]] = None,
    ) -> None:
        """[Thread-N] GIF変換と出力

        Args:
            info (GIFExportInfo): GIF変換、出力情報
            quantized_callback (Optional[Callable[[list[Image.Image], float], None]], optional): 量子化後のコールバック. Defaults to None.
            exported_callback (Optional[Callable[[bool], None]], optional): GIF出力後のコールバック. Defaults to None.
        """
        # 量子化スレッドの入出力用
        input_queue = mp.Queue()
        output_queue = mp.Queue()

        # 量子化スレッドリスト
        # NOTE: 元はプロセスだけどtkinterとの相性問題でスレッドに変更.
        threads:list[th.Thread] = []

        # 動画読込
        with WithVideoCapture(info.input_path) as cap:
            # プロセスの立ち上げ
            for _ in range(info.num_workers):
                if info.resize != 1.0:
                    thread = th.Thread(
                        target=GIFConverter.update_image_scale_quantize,
                        args=(
                            input_queue,
                            output_queue,
                            int(cap.width * info.resize),
                            int(cap.height * info.resize),
                            cv2.INTER_AREA,
                            info.quantize_method,
                        ),
                        daemon=True,
                    )
                else:
                    thread = th.Thread(
                        target=GIFConverter.update_image_quantize,
                        args=(
                            input_queue,
                            output_queue,
                            info.quantize_method,
                        ),
                        daemon=True,
                    )
                thread.start()
                threads.append(thread)

            # 画像をキューに突っ込む
            while cap.read():
                input_queue.put((cap.frame, cv2.cvtColor(cap.image, cv2.COLOR_BGRA2RGB)))

        # 量子化スレッドの終了合図を送信
        for _ in range(info.num_workers):
            input_queue.put(None)

        try:
            # 画像をフレーム順に並び替え
            images:list[tuple[int, Image.Image]] = [output_queue.get() for _ in range(cap.frames)]
            images:list[Image.Image] = [image for _, image in sorted(images, key=lambda values: values[0])]

            # 画像1枚あたりの表示時間
            duration = 1.0 / (cap.fps * info.play_speed) * 1000.0

            # 量子化完了後のコールバックが登録されている場合は、画像と表示時間を渡します。
            if quantized_callback is not None:
                quantized_callback(images, duration)

            # GIF出力
            images[0].save(info.output_path, save_all=True, append_images=images[1:], optimize=False, duration=duration, loop=0)

            # 出力結果
            is_success = True
        except Exception:
            is_success = False

        # GIF出力後のコールバックが登録されている場合は、成否を渡します。
        if exported_callback is not None:
            exported_callback(is_success)

    @staticmethod
    def update_image_scale_quantize(
        input_queue:mp.Queue,
        output_queue:mp.Queue,
        width:int,
        height:int,
        interpolation:int,
        quantize_method:int,
    ) -> None:
        """画像のリサイズと量子化

        処理された画像は出力キューに積まれます。

        Args:
            input_queue (mp.Queue): 画像の入力キュー
            output_queue (mp.Queue): 画像の出力キュー
            width (int): リサイズ後の横幅
            height (int): リサイズ後の縦幅
            interpolation (int): リサイズの補間方法
            quantize_method (int): 量子化の種類
        """
        while True:
            # Noneを受け取るまで仕事をします.
            if (values:=input_queue.get()) is None:
                return

            # リサイズ後に量子化を行います.
            frame, image = values
            image = cv2.resize(image, (width, height), interpolation=interpolation)
            image = GIFConverter.image_quantize(image, method=quantize_method)

            # 加工結果を送信します.
            output_queue.put((frame, image))

    @staticmethod
    def update_image_quantize(
        input_queue:mp.Queue,
        output_queue:mp.Queue,
        quantize_method:int,
    ) -> None:
        """画像の量子化

        Args:
            input_queue (mp.Queue): 画像の入力キュー
            output_queue (mp.Queue): 画像の出力キュー
            quantize_method (int): 量子化の種類
        """
        while True:
            # Noneを受け取るまで仕事をします。
            if (values:=input_queue.get()) is None:
                return

            # 量子化を行います。
            frame, image = values
            image = GIFConverter.image_quantize(image, method=quantize_method)

            # 加工結果を送信します。
            output_queue.put((frame, image))

    @staticmethod
    def image_quantize(
        image:np.ndarray,
        colors:int=256,
        method:int=Image.Quantize.MEDIANCUT,
        dither:int=Image.Dither.NONE,
        mode:str="RGB",
    ) -> Image.Image:
        """画像の量子化

        Args:
            image (np.ndarray): 入力画像(RGB配置を想定)
            colors (int, optional): 減色後の色数. Defaults to 256.
            method (int, optional): 量子化の種類. Defaults to Image.Quantize.MEDIANCUT.
            dither (int, optional): ディザの種類. Defaults to Image.Dither.NONE.
            mode (str, optional): 出力結果の形式. Defaults to "RGB".

        Returns:
            Image.Image: 量子化された画像
        """
        image:Image.Image = Image.fromarray(image, mode=mode)
        image = image.quantize(colors=colors, method=method, dither=dither)
        return image.convert(mode)

tkinterはスレッドセーフではない

少し寄り道します。

tkinterGUI更新をメインスレッドで行なっているため、重い処理は別スレッドに逃して、GUI遅延を極力出さないようにするのがおそらく一般的です。

そのため、よくスレッド化をしますが、tkinter自体はスレッドセーフな設計にはなっていないため、GUI更新に使用する要素に関しては、各々でスレッドセーフな仕組みを入れなくてはいけません。

地味に面倒です。

今回でいうと画像の量子化結果のプレビュー表示まわりにロックを入れています。メインと合わせても2スレッドで且つ一般的なフレームレートである16 ~ 33ms程度であれば競合することも珍しい&例外処理で握り潰せますが、最近スレッド化も面白いなと思い始めたので、勉強ついでにロック入れてみました。

注意点としてメインスレッドでロックを取得するまで待機すると永遠と抜けられなくなります。

import tkinter as tk
import ttkbootstrap as ttk
from ttkbootstrap.constants import *

import threading as th
from PIL import Image, ImageTk
from itertools import cycle

from editor.grid_util import *


__all__ = [
    "ImageView",
]


class ImageView:
    def __init__(
        self,
        master:tk.Misc,
        column:int,
        row:int,
        columnspan:int = 1,
        sticky:str = NSEW,
    ) -> None:
        grid = GridUtil(column, row, columnspan, sticky)

        self.master = master

        self.lock = th.Lock()

        self.image:ImageTk.PhotoImage = None
        self.image_cylcle:cycle = None
        self.duration:int = 33

        self.view = ttk.Label(master)
        self.view.grid(column=grid.column, row=grid.row, columnspan=grid.columnspan, sticky=grid.sticky)

        self.update_image()

    def set_images(self, images:list[Image.Image], duration:float) -> None:
        """[Thread-N] ビューに使用する画像をセット

        Args:
            images (list[Image.Image]): 画像リスト
            duration (float): 画像1枚あたりの表示時間(ミリ秒)
        """
        with self.lock:
            self.image_cylcle = cycle([ImageTk.PhotoImage(image) for image in images])
            self.duration = int(duration)

    def update_image(self) -> None:
        """[MainThread] ビューの画像を更新
        """
        if self.image_cylcle is None:
            self.master.after(33, self.update_image)
        elif not self.lock.acquire(True, 0.0):
            # NOTE: メインスレッドでロック取得待機するとメインスレッドが止まるので流します。
            self.master.after(33, self.update_image)
        else:
            self.image:ImageTk.PhotoImage = next(self.image_cylcle)

            self.lock.release()

            self.view.configure(image=self.image)
            self.master.after(self.duration, self.update_image)

ログを仕込んで見てみる

実行スレッドと時間を可視化するLoggerを仕込んで見ると、ロックしないと危なそうな雰囲気で読み書きが発生していることが汲み取れます。

ロックなし
ロックあり

こういうログを眺めるのも面白いですね。
以上、寄り道でした。

アプリケーション

ざっくりtkinterで組み込みます。

エディタ系のコードはOCRやTTS開発で温めているGUIライブラリとその設計があるため、そこから一部拝借して突貫工事をしています。

あとおそらく爆速操作をすると一部ボタンの有効性が戻らない不具合があるはずですが、人間業じゃないor操作運が悪いということで考慮しないこととします。

スレッドはデーモン化しているため、途中で強制終了した場合でも道連れになるはずと、信じています。


Adobe Expressと比較

Adobe Express (100%)
Median cut (50%)

リサイズ100%の圧縮率は悪くなっていますが、顕著なディザパターンは抑えられています。

このあたりは10.0 MBに収まるように適当にリサイズして使うことにします。

おわり!!!

お疲れさまでした!!!

既存サービスを利用するのが安定ですが、自作はロマン溢れる良い響きです。

Pythonさんは豊富なライブラリによりそれが比較的安価に叶えられます。

それ故にアレコレ作りたい欲が収まらない問題もありますが、のんびり欲を満たしていきます。