非同期タスクキューを使って業務を自動化しまくった話:python-rq


業務の効率化を限界まで進めた話

前回のcopierに続きまして、今回も本業の機械系設計事務所の取り組みについて。業務効率化を限界まで進めるために、自動化タスクを非同期タスクキューを使って処理をできるようにした話です。

とある依頼案件を受けているとき、今まではそれほどボリュームがないものでしたが、今年にかけて急激に増えてきました。案件に対してプロジェクトファイルを用意する(前回のお話です)、案件の成果物を納品する、案件が終わった後事務処理を行う、これらを人力でやっていては追いつかないようになってきて自動化を進めました。

結果的には案件の成果物に関わるメインの業務以外はほとんど自動的に処理できるようになってきました。

bg 70%

こんな絵を想像していて、実際はこんな感じになりました

w:800px

最近だとGoogle Chat上のチャットアプリ(ボット)でタスク処理を走らせています。その時にチャットアプリの制約がありまして、

「同期的に応答するには、Chat アプリが 30 秒以内に応答し、その応答をインタラクションが発生したスペースに投稿する必要があります。それ以外の場合は、Chat アプリは非同期で応答できます。」

Google Chat アプリでのやり取りの受信と返信  |  Google for Developers

Slackの3秒よりも全然緩いけど、非同期前提な様子)

自動化したいタスクの多くは重い処理で、ファイル操作、外部のインターネット経由でAPIアクセス。つまりI/Oバウンド処理になります。

仮に同期処理でチャットアプリを作ると、処理の間は応答できず、応答しない=エラーとして処理されるので都合が良くないです。

そこで、非同期タスクキューという概念を使ってみました。チャットアプリ側で処理するタスクをキューに入れて、同期的な応答はその場ですぐに返答し、ワーカー側で処理を行った結果をチャットアプリに非同期で返すようにしました。

Pythonでの非同期処理の選択肢

調べた限りでいくつかあります。

  • 標準ライブラリ:
    • (並列)threading
    • (並列)multiprocessing
    • (並列)concurrent.futures
    • (非同期)asyncio
    • (並列?3.12から)sub-interpreters
  • メッセージキュー活用: celery, rq, pyzmq(ZeroMQ)
  • クラウドのメッセージング: Cloud Pub/Sub(イベントベースで
  • etc...

asyncioの解説はPython実践レシピがわかりやすいのでオススメです。

Pythonエンジニア育成推進協会監修 Python実践レシピ
B09Q58TJ6N

ASIN : B09Q58TJ6N
Amazonで詳しく見る
Powered by Amazon Quick Affiliate (JP)

こちらはAmazonアソシエイトプログラム参加リンクです

今回はシンプルにタスクを管理できてかつ、チャットアプリを動かしているdockerで動く環境で利用できるものとして、RQ(python-rq)を選びました。

python-rq: https://python-rq.org/

RQは以下の3つの要素で構成されています。

  • アプリ: タスク発行→キューへ入れる→ワーカーから処理結果を受け取る
  • ワーカー: タスクの処理を行う
  • redis: アプリとワーカーの間に入りキューとして利用する。データやオブジェクトの受け渡しも

今回やりたい非同期タスクのざっくりイメージがこちらになります。

Docker環境で動かす

ということで、ちょっぱやでDocckerで用意する場合の例です。参考の記事のほぼそのままで(感謝🙏)これをベースにして社内で動かしています。

参考: Python で分散タスクキュー (RQ 編) #Python - Qiita @hoto17296

以下に載せたコードはGitHubのリポジトリもありますのでぜひ試してみてください。

https://github.com/hrsano645/exam-python-rq-by-docker

各ファイル

Dockerfile

FROM python:3.11
RUN pip install rq

compose.yml

version: '3'
services:
  redis:
    image: redis
  worker:
    build: .
    depends_on:
      - redis
    environment:
      RQ_REDIS_URL: redis://redis
    command: rq worker
    volumes:
      - .:/app
    working_dir: /app
  app:
    build: .
    depends_on:
      - redis
      - worker
    environment:
      RQ_REDIS_URL: redis://redis
    command: python app.py
    volumes:
      - .:/app
    working_dir: /app

tasks.py

import logging

logger = logging.getLogger(__name__)


def add(a, b):
    logger.debug("{} + {} = {}".format(a, b, a + b))
    return a + b

app.py

import os
from time import sleep
import redis
from rq import Queue
from tasks import add

q = Queue(connection=redis.from_url(os.environ.get("RQ_REDIS_URL")))

# 10個のタスクの実行をキューに投げる
tasks = [q.enqueue(add, args=(i, 1)) for i in range(10)]

# タスク実行が完了するまで少し待つ
sleep(1)

# 結果を出力する
print([task.result for task in tasks])

実行方法はdocker-composeで行います。

# シングルワーカー
$ docker-compose up

# 複数ワーカー: 4つのワーカーを起動
$ docker-compose up --scale worker=4
## ログは別途ファイルでみせます

複数ワーカーは単純にプロセスの数を増やせばいいそうです。

Each worker will process a single job at a time. Within a worker, there is no concurrent processing going on. If you want to perform jobs concurrently, simply start more workers.

by RQ: Workers

結果

シングルワーカーの場合

PS C:\Users\hiroshi\Documents\workspace\personal\exam-python-rq-by-docker> docker compose up
[+] Running 3/0
  Container exam-python-rq-by-docker-redis-1   Created                                                                                                                                 0.0s 
  Container exam-python-rq-by-docker-worker-1  Created                                                                                                                                 0.0s 
  Container exam-python-rq-by-docker-app-1     Created                                                                                                                                 0.0s 
Attaching to exam-python-rq-by-docker-app-1, exam-python-rq-by-docker-redis-1, exam-python-rq-by-docker-worker-1
exam-python-rq-by-docker-redis-1   | 1:C 15 Dec 2023 00:59:36.233 # WARNING Memory overcommit must be enabled! Without it, a background save or replication may fail under low memory condition. Being disabled, it can also cause failures without low memory condition, see https://github.com/jemalloc/jemalloc/issues/1328. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.
exam-python-rq-by-docker-redis-1   | 1:C 15 Dec 2023 00:59:36.233 * oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
exam-python-rq-by-docker-redis-1   | 1:C 15 Dec 2023 00:59:36.233 * Redis version=7.2.1, bits=64, commit=00000000, modified=0, pid=1, just started
exam-python-rq-by-docker-redis-1   | 1:C 15 Dec 2023 00:59:36.233 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf
exam-python-rq-by-docker-redis-1   | 1:M 15 Dec 2023 00:59:36.234 * monotonic clock: POSIX clock_gettime
exam-python-rq-by-docker-redis-1   | 1:M 15 Dec 2023 00:59:36.234 * Running mode=standalone, port=6379.
exam-python-rq-by-docker-redis-1   | 1:M 15 Dec 2023 00:59:36.234 * Server initialized
exam-python-rq-by-docker-redis-1   | 1:M 15 Dec 2023 00:59:36.234 * Loading RDB produced by version 7.2.1
exam-python-rq-by-docker-redis-1   | 1:M 15 Dec 2023 00:59:36.234 * RDB age 9 seconds
exam-python-rq-by-docker-redis-1   | 1:M 15 Dec 2023 00:59:36.234 * RDB memory usage when created 1.62 Mb
exam-python-rq-by-docker-redis-1   | 1:M 15 Dec 2023 00:59:36.235 * Done loading RDB, keys loaded: 48, keys expired: 0.
exam-python-rq-by-docker-redis-1   | 1:M 15 Dec 2023 00:59:36.235 * DB loaded from disk: 0.000 seconds
exam-python-rq-by-docker-redis-1   | 1:M 15 Dec 2023 00:59:36.235 * Ready to accept connections tcp
exam-python-rq-by-docker-worker-1  | 00:59:36 Worker rq:worker:a6e5a3b574df45cf83bef4ff966eaaee started with PID 1, version 1.15.1
exam-python-rq-by-docker-worker-1  | 00:59:36 Subscribing to channel rq:pubsub:a6e5a3b574df45cf83bef4ff966eaaee
exam-python-rq-by-docker-worker-1  | 00:59:36 *** Listening on default...
exam-python-rq-by-docker-worker-1  | 00:59:36 default: tasks.add(0, 1) (dea8eb17-721d-4cf9-b4ae-de765a17f592)
exam-python-rq-by-docker-worker-1  | 00:59:36 default: Job OK (dea8eb17-721d-4cf9-b4ae-de765a17f592)
exam-python-rq-by-docker-worker-1  | 00:59:36 Result is kept for 500 seconds
exam-python-rq-by-docker-worker-1  | 00:59:36 default: tasks.add(1, 1) (6843b1dc-c780-4ce3-a748-8763eedea25e)
exam-python-rq-by-docker-worker-1  | 00:59:36 default: Job OK (6843b1dc-c780-4ce3-a748-8763eedea25e)
exam-python-rq-by-docker-worker-1  | 00:59:36 Result is kept for 500 seconds
exam-python-rq-by-docker-worker-1  | 00:59:36 default: tasks.add(2, 1) (de2c53c2-4a1f-44e3-bca3-4f33166cd7b9)
exam-python-rq-by-docker-worker-1  | 00:59:36 default: Job OK (de2c53c2-4a1f-44e3-bca3-4f33166cd7b9)
exam-python-rq-by-docker-worker-1  | 00:59:36 Result is kept for 500 seconds
exam-python-rq-by-docker-worker-1  | 00:59:36 default: tasks.add(3, 1) (be3cdd57-48af-4e8a-828d-6c061cdaa692)
exam-python-rq-by-docker-worker-1  | 00:59:36 default: Job OK (be3cdd57-48af-4e8a-828d-6c061cdaa692)
exam-python-rq-by-docker-worker-1  | 00:59:36 Result is kept for 500 seconds
exam-python-rq-by-docker-worker-1  | 00:59:36 default: tasks.add(4, 1) (85b6f32d-7a68-4395-9800-1f8eb0684e47)
exam-python-rq-by-docker-worker-1  | 00:59:36 default: Job OK (85b6f32d-7a68-4395-9800-1f8eb0684e47)
exam-python-rq-by-docker-worker-1  | 00:59:36 Result is kept for 500 seconds
exam-python-rq-by-docker-worker-1  | 00:59:36 default: tasks.add(5, 1) (87de9eee-c66b-4287-bb14-931d3186bafb)
exam-python-rq-by-docker-worker-1  | 00:59:36 default: Job OK (87de9eee-c66b-4287-bb14-931d3186bafb)
exam-python-rq-by-docker-worker-1  | 00:59:36 Result is kept for 500 seconds
exam-python-rq-by-docker-worker-1  | 00:59:36 default: tasks.add(6, 1) (b4c157f6-0296-4635-a884-9fbcf0558b90)
exam-python-rq-by-docker-worker-1  | 00:59:36 default: Job OK (b4c157f6-0296-4635-a884-9fbcf0558b90)
exam-python-rq-by-docker-worker-1  | 00:59:36 Result is kept for 500 seconds
exam-python-rq-by-docker-worker-1  | 00:59:36 default: tasks.add(7, 1) (06f4c077-5450-4015-b871-c567713059e2)
exam-python-rq-by-docker-worker-1  | 00:59:36 default: Job OK (06f4c077-5450-4015-b871-c567713059e2)
exam-python-rq-by-docker-worker-1  | 00:59:36 Result is kept for 500 seconds
exam-python-rq-by-docker-worker-1  | 00:59:36 default: tasks.add(8, 1) (488df06d-73a5-47ec-af79-8193034529d2)
exam-python-rq-by-docker-worker-1  | 00:59:36 default: Job OK (488df06d-73a5-47ec-af79-8193034529d2)
exam-python-rq-by-docker-worker-1  | 00:59:36 Result is kept for 500 seconds
exam-python-rq-by-docker-worker-1  | 00:59:36 default: tasks.add(9, 1) (50a583fa-48dc-47a9-8341-567349a76182)
exam-python-rq-by-docker-worker-1  | 00:59:36 default: Job OK (50a583fa-48dc-47a9-8341-567349a76182)
exam-python-rq-by-docker-worker-1  | 00:59:36 Result is kept for 500 seconds
exam-python-rq-by-docker-app-1     | [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
exam-python-rq-by-docker-app-1 exited with code 0

複数ワーカーの場合。分散している様子がわかります。

PS C:\Users\hiroshi\Documents\workspace\personal\exam-python-rq-by-docker> docker compose up --scale worker=4
[+] Running 6/0
  Container exam-python-rq-by-docker-redis-1   Created                                                                                                                                 0.0s 
  Container exam-python-rq-by-docker-worker-1  Created                                                                                                                                 0.0s 
  Container exam-python-rq-by-docker-worker-4  Created                                                                                                                                 0.0s 
  Container exam-python-rq-by-docker-worker-2  Created                                                                                                                                 0.0s 
  Container exam-python-rq-by-docker-worker-3  Created                                                                                                                                 0.0s 
  Container exam-python-rq-by-docker-app-1     Created                                                                                                                                 0.0s 
Attaching to exam-python-rq-by-docker-app-1, exam-python-rq-by-docker-redis-1, exam-python-rq-by-docker-worker-1, exam-python-rq-by-docker-worker-2, exam-python-rq-by-docker-worker-3, exam-python-rq-by-docker-worker-4
exam-python-rq-by-docker-redis-1   | 1:C 15 Dec 2023 01:09:19.733 # WARNING Memory overcommit must be enabled! Without it, a background save or replication may fail under low memory condition. Being disabled, it can also cause failures without low memory condition, see https://github.com/jemalloc/jemalloc/issues/1328. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.
exam-python-rq-by-docker-redis-1   | 1:C 15 Dec 2023 01:09:19.733 * oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
exam-python-rq-by-docker-redis-1   | 1:C 15 Dec 2023 01:09:19.733 * Redis version=7.2.1, bits=64, commit=00000000, modified=0, pid=1, just started
exam-python-rq-by-docker-redis-1   | 1:C 15 Dec 2023 01:09:19.733 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf
exam-python-rq-by-docker-redis-1   | 1:M 15 Dec 2023 01:09:19.733 * monotonic clock: POSIX clock_gettime
exam-python-rq-by-docker-redis-1   | 1:M 15 Dec 2023 01:09:19.733 * Running mode=standalone, port=6379.
exam-python-rq-by-docker-redis-1   | 1:M 15 Dec 2023 01:09:19.734 * Server initialized
exam-python-rq-by-docker-redis-1   | 1:M 15 Dec 2023 01:09:19.734 * Loading RDB produced by version 7.2.1
exam-python-rq-by-docker-redis-1   | 1:M 15 Dec 2023 01:09:19.734 * RDB age 5 seconds
exam-python-rq-by-docker-redis-1   | 1:M 15 Dec 2023 01:09:19.734 * RDB memory usage when created 1.51 Mb
exam-python-rq-by-docker-redis-1   | 1:M 15 Dec 2023 01:09:19.734 * Done loading RDB, keys loaded: 4, keys expired: 0.
exam-python-rq-by-docker-redis-1   | 1:M 15 Dec 2023 01:09:19.734 * DB loaded from disk: 0.000 seconds
exam-python-rq-by-docker-redis-1   | 1:M 15 Dec 2023 01:09:19.734 * Ready to accept connections tcp
exam-python-rq-by-docker-worker-4  | 01:09:20 Worker rq:worker:af1ebbcefcb742f6a42fde5fe610546f started with PID 1, version 1.15.1
exam-python-rq-by-docker-worker-4  | 01:09:20 Subscribing to channel rq:pubsub:af1ebbcefcb742f6a42fde5fe610546f
exam-python-rq-by-docker-worker-4  | 01:09:20 *** Listening on default...
exam-python-rq-by-docker-worker-2  | 01:09:20 Worker rq:worker:51a01a96b1164cf1b05bbfd3d08597df started with PID 1, version 1.15.1
exam-python-rq-by-docker-worker-2  | 01:09:20 Subscribing to channel rq:pubsub:51a01a96b1164cf1b05bbfd3d08597df
exam-python-rq-by-docker-worker-2  | 01:09:20 *** Listening on default...
exam-python-rq-by-docker-worker-3  | 01:09:20 Worker rq:worker:d6a122a9de594a72b278a27e20522375 started with PID 1, version 1.15.1
exam-python-rq-by-docker-worker-3  | 01:09:20 Subscribing to channel rq:pubsub:d6a122a9de594a72b278a27e20522375
exam-python-rq-by-docker-worker-3  | 01:09:20 *** Listening on default...
exam-python-rq-by-docker-worker-1  | 01:09:20 Worker rq:worker:797d72990ff0432cb0f9fb7e2688d219 started with PID 1, version 1.15.1
exam-python-rq-by-docker-worker-1  | 01:09:20 Subscribing to channel rq:pubsub:797d72990ff0432cb0f9fb7e2688d219
exam-python-rq-by-docker-worker-1  | 01:09:20 *** Listening on default...
exam-python-rq-by-docker-worker-2  | 01:09:21 default: tasks.add(1, 1) (54d150b2-6037-4c77-aee9-5a2a254f982d)
exam-python-rq-by-docker-worker-3  | 01:09:21 default: tasks.add(2, 1) (9927b349-9a0f-4b96-99bf-891fd530e470)
exam-python-rq-by-docker-worker-1  | 01:09:21 default: tasks.add(3, 1) (f1f2ce3f-12d0-4678-ab9b-5b3de7cf22d2)
exam-python-rq-by-docker-worker-4  | 01:09:21 default: tasks.add(0, 1) (970a5464-500a-44dc-995f-ef9c678541b1)
exam-python-rq-by-docker-worker-1  | 01:09:21 default: Job OK (f1f2ce3f-12d0-4678-ab9b-5b3de7cf22d2)
exam-python-rq-by-docker-worker-3  | 01:09:21 default: Job OK (9927b349-9a0f-4b96-99bf-891fd530e470)
exam-python-rq-by-docker-worker-1  | 01:09:21 Result is kept for 500 seconds
exam-python-rq-by-docker-worker-3  | 01:09:21 Result is kept for 500 seconds
exam-python-rq-by-docker-worker-2  | 01:09:21 default: Job OK (54d150b2-6037-4c77-aee9-5a2a254f982d)
exam-python-rq-by-docker-worker-2  | 01:09:21 Result is kept for 500 seconds
exam-python-rq-by-docker-worker-3  | 01:09:21 default: tasks.add(4, 1) (e202b5de-bfba-43de-b90c-2bb4736035e4)
exam-python-rq-by-docker-worker-1  | 01:09:21 default: tasks.add(5, 1) (e83244a9-1057-4be7-8201-d1a7697a9c24)
exam-python-rq-by-docker-worker-4  | 01:09:21 default: Job OK (970a5464-500a-44dc-995f-ef9c678541b1)
exam-python-rq-by-docker-worker-4  | 01:09:21 Result is kept for 500 seconds
exam-python-rq-by-docker-worker-2  | 01:09:21 default: tasks.add(6, 1) (c4c8573b-21fc-4e84-a558-c8da040b3d04)
exam-python-rq-by-docker-worker-4  | 01:09:21 default: tasks.add(7, 1) (f03f16f9-f8d5-45a1-8be3-c1cb01fdccc2)
exam-python-rq-by-docker-worker-1  | 01:09:21 default: Job OK (e83244a9-1057-4be7-8201-d1a7697a9c24)
exam-python-rq-by-docker-worker-3  | 01:09:21 default: Job OK (e202b5de-bfba-43de-b90c-2bb4736035e4)
exam-python-rq-by-docker-worker-1  | 01:09:21 Result is kept for 500 seconds
exam-python-rq-by-docker-worker-3  | 01:09:21 Result is kept for 500 seconds
exam-python-rq-by-docker-worker-2  | 01:09:21 default: Job OK (c4c8573b-21fc-4e84-a558-c8da040b3d04)
exam-python-rq-by-docker-worker-2  | 01:09:21 Result is kept for 500 seconds
exam-python-rq-by-docker-worker-1  | 01:09:21 default: tasks.add(9, 1) (1b722dde-57e9-40cb-89d0-66b33795b41c)
exam-python-rq-by-docker-worker-3  | 01:09:21 default: tasks.add(8, 1) (ea370d83-8a41-4f06-9ce4-a1c7d7fef2a0)
exam-python-rq-by-docker-worker-4  | 01:09:21 default: Job OK (f03f16f9-f8d5-45a1-8be3-c1cb01fdccc2)
exam-python-rq-by-docker-worker-4  | 01:09:21 Result is kept for 500 seconds
exam-python-rq-by-docker-worker-3  | 01:09:21 default: Job OK (ea370d83-8a41-4f06-9ce4-a1c7d7fef2a0)
exam-python-rq-by-docker-worker-3  | 01:09:21 Result is kept for 500 seconds
exam-python-rq-by-docker-worker-1  | 01:09:21 default: Job OK (1b722dde-57e9-40cb-89d0-66b33795b41c)
exam-python-rq-by-docker-worker-1  | 01:09:21 Result is kept for 500 seconds
exam-python-rq-by-docker-app-1     | [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
exam-python-rq-by-docker-app-1 exited with code 0

dockerで動かす時のtips

  • RQはredis(キュー)へタスクを渡すときはpickleを使ってる
    • ワーカー側でもpickleで渡されたオブジェクトが理解できないといけない -> ワーカー側にも同じライブラリをインストールする必要がある
  • 手っ取り早い方法として
    • タスク側もワーカー側も同じ環境=Dockerfileを使う/requirements.txtを使う
    • コード参照や利用するボリュームも同じ箇所を参照すると楽
  • タスクとワーカーを同時に動かすならcomposeが便利

注意点

RQはWindowsは非対応です。ワーカーは理由はunixのプロセス生成関数のforkを利用しているからです。ただWSL経由なら動くそうです。

-> https://python-rq.org/docs/#limitations

まとめ

社内で活用している非同期タスクキューのお話とRQの紹介でした。非同期処理は苦手意識があったのですが、シンプルな方法に慣れておけば、今後処理させたいタスクをまずはこれで作れますし、そのほかのライブラリでもある程度勘所が掴めそうに思います。

今後も本業で扱っている技術情報をまとめていきます!次回はGoogle Chatのチャットアプリについて紹介します。

参考

この記事ははんなりプログラミング:1年の締めくくり!2023年にチャレンジしたことのLT祭で発表した内容を記事にしたものです。

宣伝

弊社の宣伝です! 自動車プレス金型の機械設計を行う設計事務所で、製品や試作品の3Dモデリングのお仕事も受け付けています。

また製造業で上記記事で扱う作業の自動化やその先に繋がるデジタル化、DXに取り組みたい方もご相談承っております。 普段の業務で自動化させてみたいことがあれば、ぜひお気軽にお問い合わせください!