Pythonで同期関数と非同期関数を統一的に扱う
February 21, 2018
Blockingな関数をasync化する方法
EventLoop.run_in_executor を使いましょう(結論)。
https://docs.python.org/3/library/asyncio-eventloop.html
関数と引数を渡すとExecutorのコンテクストで実行してAwaitable(coroutine)を返してくれます。
第一引数のexecutorは省略可能で、その場合は実行環境に応じたものが自動的に使われます。ThreadベースとProcessベースで性質が違いますので要求に応じて使い分けましょう。
Executorにもよりますが引数はシリアライズ可能なものに限ります。名前付き引数はpartialで事前にbindしておく必要があります。
なおWindows環境特有の問題としてKeyboardInterruptをうまく扱えないみたいな問題があるようなのですがWindowsでプログラをミングすることがないので特に確かめたりとかはしてしません。
参考: => https://gist.github.com/lambdalisue/05d5654bd1ec04992ad316d50924137c
サンプル
おざなりですんません。
チラシの裏レベルの書きなぐりですがいろいろなパターンの実装があります。手元で実行してみるとわかりやすいかとおもいます。
https://gist.github.com/hachibeeDI/f38bc0496fc9e29ddb3f45c08b88432d
import asyncio
from time import sleep, time
def blocker(txt):
    sleep(3)
    return 'aaa' + txt
def async_sample(loop):
    start = time()
    results = loop.run_until_complete(asyncio.gather(
        loop.run_in_executor(None, blocker, '1'),
        loop.run_in_executor(None, blocker, '2'),
        loop.run_in_executor(None, blocker, '3'),
    ))
    time_took = time() - start
    print('time took', time_took)
    print(results)
async def async_sample2(loop):
    start = time()
    f1 = loop.run_in_executor(None, blocker, '1')
    f2 = loop.run_in_executor(None, blocker, '2')
    f3 = loop.run_in_executor(None, blocker, '3')
    print([
        await f1,
        await f2,
        await f3,
    ])
    time_took = time() - start
    print('time took', time_took)
def sync_sample():
    sync_start = time()
    sync_results = [
        blocker('1'),
        blocker('2'),
        blocker('3'),
    ]
    sync_time_took = time() - sync_start
    print(sync_time_took)
    print(sync_results)
from functools import partial
def promisify(loop, executor=None):
    def _promisified(func):
        def __inner(*a, **kw):
            return loop.run_in_executor(executor, partial(func, *a, *kw))
        return __inner
    return _promisified
loop = asyncio.get_event_loop()
@promisify(loop)
def promisified_blocker(txt):
    sleep(3)
    return 'aaa' + txt
async def promisify_sample():
    start = time()
    # f1 = promisified_blocker('1')
    # f2 = promisified_blocker('2')
    # f3 = promisified_blocker('3')
    # print([
    #     await f1,
    #     await f2,
    #     await f3,
    # ])
    print(await asyncio.gather(
        promisified_blocker('1'),
        promisified_blocker('2'),
        promisified_blocker('3'),
    ))
    time_took = time() - start
    print('time took', time_took)
if __name__ == '__main__':
    # async_sample(loop)
    # print('-' * 10)
    # loop.run_until_complete(async_sample2(loop))
    # print('-' * 10)
    # sync_sample()
    #
    loop.run_until_complete(promisify_sample())
    loop.close()
3秒の同期的スリープ関数を三回実行するサンプルでも、executor越しで実行すると並列に行われている様子が観察できるかとおもいます。
run_in_executor はcoroutineを返しますので、async関数内であれば結果をawaitで待つことも可能です。
何故かPythonの非同期関係のサンプルは副作用ベースで記述されているものばかりで返り値を持てないのかなとおもってしまわなくもなくはなくないですがちゃんと持てます。
まとめ
asyncを使いはじめると既存の関数も同様のインターフェースで統一的に並列実行したくなることがかなりあるかとおもいます。
そういうときに便利だとおもいます。
ただし適当に使っても高速化するかどうかは微妙なのでボトルネックに対して使うこと、そしてネックになっているのがIOなのかCPUなのかを検証してから使うようにするといいんじゃないかなーとおもいました。
終了。