読者です 読者をやめる 読者になる 読者になる

ゆくゆくは有へと

おかゆ/オカ∃/大鹿有生/彼ノ∅有生 の雑記

Pythonの非同期通信(asyncioモジュール)入門を書きました

はじめに

非同期処理のことから知らない人向けにPythonくらいしかろくに知らない人間が書きました。せっかくキーワードが文法に組み込まれたんだから理解したいじゃんか!

asyncioモジュールを使うための基本的な概念が理解できるようになってるはず、多分。

環境としては Python3.5 以上を想定しています。つまり、awaitasync キーワードを使っていきます。

それから、関数やメソッドの仮引数は全く書いてません。必要最低限は文中で説明していますが、より完全に知りたい人は適宜ドキュメントの参照をお願いします。

主役はループちゃん

asyncioの主役はイベントループです。イベントループは頼まれた仕事を順番にどんどん処理していくデキるクールガイです。

本質的に、私たちはイベントループに仕事を関数オブジェクトの形で与えていくだけです。asyncioモジュールの大部分は、私たちが仕事を関数以外の形でイベントループに与えるためにあります(もちろん内部では関数としてイベントループに仕事を与えます)。後々やりますが、ほとんどの場合、私たちはコルーチンという形で仕事を定義し、それを加工してもらいます。

さて、その肝心のイベントループは

loop = asyncio.get_event_loop()

で手に入れます。

仕事の依頼

では、あまりに素朴ですが、関数オブジェクトとして仕事を与える方法は2つあります:

  • call_soonメソッド
  • call_laterメソッド

loop.call_soon(callback, *args) は、イベントループに callback という関数なる仕事をその引数 args とともに渡します。

仕事の開始時間を決めることもできます。loop.call_later(time, callback, *args) は、今から time 秒後の時点に callback を呼ぶようにイベントループにお願いします。ただし、それ以前の仕事に時間がかかっている場合は、この仕事に手を付けるのは開始時間以降になるので注意です。

イベントループの実行

色々とイベントループに仕事を頼みこんだあとは、イベントループを実行して仕事を処理していってもらいましょう!

loop.run_forever()

によって、ループを永遠に実行します。

ただし、永遠に実行されると困るので、なんらかの形でイベントループを止めるようにしましょう。イベントループの停止は loop.stop() で行えます。loop.stop()が呼ばれると、イベントループはその時点に積んである仕事をすべて終わらせてから停止します。ただし、その仕事をこなす途中で新たに追加された仕事は、次回の実行のときまで延期されます。残業中に出てきた新しい仕事は、次の出社日にやるってことですね。

というわけで、

  • loop.call_soon(loop.stop) で最後の仕事としてループの停止をさせる
  • 最後の仕事として渡した関数の中身でloop.stop()を呼ぶ

などをしてループに退社命令を出してあげましょう。停止したループは

loop.close()

で閉じておきましょう。

ドキュメントにある例を少し、ほんの少しだけ変えたものです:

import asyncio
import datetime


def display_date(end_time, loop):
    print(datetime.datetime.now())
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, display_date, end_time, loop)
    else:
        loop.call_soon(loop.stop) # 単に loop.stop() でもいい

loop = asyncio.get_event_loop()

# Schedule the first call to display_date()
end_time = loop.time() + 5.0
loop.call_soon(display_date, end_time, loop)

# Blocking call interrupted by loop.stop()
loop.run_forever()
loop.close()

display_dateが2つの仕事をしています:

  • 今の時刻の表示
  • 次の仕事の追加(もう一回 display_date をさせるか、停止させるか)

ループを走らせる時点では、ループには1つの仕事しか与えていませんが、その仕事の中で次の仕事をループに与えているという構図です。

こういう関数を渡すのもありかもしれません:

def display_date2(loop):
    print(datetime.datetime.now())
    try:
        loop.call_later(1, display_date, end_time, loop)
    except KeyboardInterrupt:
        loop.stop()

Ctrl + C でループを邪魔しない限り、永遠にループは時刻を表示しつづけます。

仕事のキャンセル

call_sooncall_laterメソッドは Handleクラスのインスタンスを返します。

その仕事をキャンセルしたい場合は、そのハンドルのcancel()メソッドが使えます。


asyncio を使うにあたって、私たちがすることは

  1. イベントループの取得(loop = asyncio.get_event_loop()
  2. イベントループへのスケジューリング(loop.call_soon(callback)とか)
  3. イベントループの実行(loop.run_forever()など)
  4. イベントループの停止(loop.stop()
  5. イベントループを閉じる(loop.close()

ということです。

なぜわざわざループちゃんに仕事をさせるのか

ここまでにしたことだけを見れば、私たちがふつうに関数を順次実行していくようにコードを書くのと何ら変わりません。call_laterだって、time.sleep()でその時間だけ待ってから次の関数を呼び出せばいいだけじゃないですか?

私たちがやりたいことをループに仕事としてやらせる最大の理由は ループは私たちがコードを順次実行するよりも器用に仕事をしてくれる からです。器用って?

というのも、ループはブロッキングI/Oについて上手くこなす術を知っているんです。

I/Oはともかくとして、ブロッキングというのは日常生活でもよく遭遇し、私たちをイライラさせていることでしょう。もしあなたが何かを待っている間、待つこと以外にろくなことができないとき、あなたはブロッキングされています!市役所の長い長い処理時間の間、あなたにできることと言えば鋭い眼差しで事務員にプレッシャーを与えることくらいでしょう。

でも、もし、処理が終わったらケータイに連絡をくれると言ってくれたらどうでしょう?その間にあなたは今日発売のお気に入りの雑誌を買いに書店へ行き、今月分の家賃を振込み、家になかった文房具を調達し、懸賞ハガキをポストに入れ、喫茶店でコーヒーを飲みながらasyncioに関する記事を書き、……。そう、実に待ち時間を有意義に過ごすことができたことでしょう!

いや、ケータイに連絡をくれなくっても、せめて受付番号を渡してくれれば、仕事を1つこなしては市役所に行って自分の処理が終わってないか確認し、まだなら他のことをしにいくというようなことができますよね。いちいち市役所に行かないといけないのが面倒ですが、やるべきことが市役所のごく近所で済むようなら、まあ十分でしょう。

イベントループは、まさにI/O(と一部の事柄)に関してこのような器用なことができるのです。時間のかかるI/Oの前でいつまでも突っ立ってないで、イベントループはそのチェックを他の仕事の後に回し、次の仕事に取りかかります。

調べたところによると、あえて区別するのであれば、ケータイに連絡してくれるような方式を「非同期I/O」、受付番号をもらって度々確認しにいくような方式を「ノンブロッキングI/O」と呼ぶようです。ちなみに最高にストレスのたまる例のアレは「同期I/O」や「ブロッキングI/O」と呼びます。

asyncioモジュールのイベントループは基本的にはノンブロッキングに仕事をこなします。ある場合は非同期的にも振る舞いますが、いずれにせよイベントループは仕事を無駄なく処理しようとしてくれます。

ループちゃんのスキルを生かす

今までのやり方だとループちゃんの器用さがまるで生かされてないのは明らかです。今までの、関数を仕事として渡すという素朴なやりかたではループちゃんは「空き時間」を上手く見つけられないのです。そこで、言わずもがなですが、次にループちゃんが器用に動き回れるような仕事の作り方を学びましょう。

…と、その準備として、ちょっと不思議なオブジェクト、Futureオブジェクトについて学びましょう。

Future

Futureオブジェクトは高レイヤーな並列処理モジュール concurrent.futures にもあるオブジェクトです。

Futureオブジェクトは、簡単にいえば 結果の代わりとなる オブジェクトです。ある値を渡すような処理があったとして、送信側はとりあえずFutureオブジェクトを受信側に渡し、送信側は処理が終わり次第そのFutureオブジェクトに値を放り込みます。受信側は、Futureオブジェクトの状態が完了になり次第、値を取り出して続きの処理に使うことができます。

イベントループの器用さは確かにこのFutureオブジェクトに因るところが大きいのですが、これについてはもう少し後で見ていきましょう。

さて、Futureオブジェクトに対してできることは、大まかに4つです:

  1. 状態の確認(done, cancelled)
  2. 値の取り出し(result, exception)
  3. 完了(値の設定・キャンセル)(set_result, set_exception, cancel)
  4. コールバックの設定(add_done_callback, remove_done_callback)

Futureインスタンスにとって “done” とは、結果か例外が設定されたことを意味します。コールバックはFutureオブジェクトが完了したときに呼び出されます。

Futureオブジェクトがキャンセルされると、cancelled 属性が True になり、コールバックも呼ばれません。

Futureオブジェクトをより輝かしいものとしてくれているのは、完了時コールバックの機能です!コールバックは、Futureオブジェクトが完了したときに呼ばれる処理です。このコールバックにはFutureオブジェクト自身のみを引数として取るような関数(あるいは一般にそのようなコーラブルオブジェクト)を指定します。

デフォルトなFutureオブジェクトはloop.create_future()で作ります。Futureオブジェクトを作った後、あるいは誰かから渡された後、コールバックを設定してお好みのFutureオブジェクトを作り上げましょう!

Futureオブジェクトを利用した上手い方法は2つあります。ひとつは、定期的にFutureオブジェクトの様子を見てもらうようにイベントループに仕事を頼む方法です。まだ完了していないようであれば、同じ仕事をcall_soonして、しばらくしてからまたイベントループに見てもらうようにします。

もうひとつは、恐らく分かっているとは思いますが、完了時コールバックを使う方法です。Futureオブジェクトに、Futureオブジェクトが完了して以降にしたかった仕事内容をコールバック関数として渡しておきます。そうするとFutureオブジェクトは自分に値が設定されたときに、イベントループにその仕事を頼みます。まさにケータイに連絡するように!イベントループはその仕事をスケジュールに組み込んで、めでたしめでたし。

完了時コールバックを用いた方が断然スタイリッシュにみえますが、前者の方でしかできないこともあります。2つの方法を使い分けられるようにしましょう。

Futureオブジェクトを利用した便利な機能のひとつが、イベントループの実行の仕方のひとつ、loop.run_until_complete(future)です。loop.run_forever()と異なり、引数のFutureオブジェクトが完了するとイベントループが停止します。

似たような挙動のものが自前でも作れそうです。Futureオブジェクトが完了したときにループを停止させればいいんですから、Futureオブジェクトにそのようなコールバックを与えてやればいいですよね:

loop = asyncio.get_event_loop()
future = loop.create_future()
future.add_done_callback(loop.stop)
... # 準備
loop.run_forever()

実は、loop.run_until_complete()にはまだ隠された機能があるのですが、これは後々明らかになるでしょう。ですから、上のコードはこのメソッドの完全版ではありません。残念!いや、むしろこんなちんけな実装じゃなくて安心?

まずは簡単な例から:

# coding=utf-8

import asyncio


def set_result_to_future(value, future):
    if not future.done():
        future.set_result(value)

loop = asyncio.get_event_loop()
future = loop.create_future()
loop.call_soon(set_result_to_future, "DONE!", future)
result = loop.run_until_complete(future)
assert(future.result() == result)
print(result)
loop.close()

あ、実はloop.run_until_complete(future)は完了したFutureオブジェクトの結果の値、すなわちfuture.result()を返してくれます。もちろん今回の場合だと、Futureオブジェクトは手元にあるので、そこから結果を取り出しても構いません。

あんまり面白みがないので、もう少し凝らしてみましょう:

# coding=utf-8

import asyncio
import sys
from random import randint


def random_hit(future, n_upper, count=1, loop=None):
    if loop is None:
        loop = asyncio.get_event_loop()
    value = randint(1, n_upper)
    if value == 1:
        future.set_result(count)
    else:
        count += 1
        loop.call_soon(random_hit, future, n_upper, count, loop)


def _callback(future):
    print("DONE!")

try:
    n = int(sys.argv[1])
except IndexError:
    print("Input an integer.")
    n = int(input())
if n < 1:
  n = 1
loop = asyncio.get_event_loop()
future = loop.create_future()
future.add_done_callback(_callback)
loop.call_soon(random_hit, future, n)
result = loop.run_until_complete(future)
print(result)
loop.close()

準備としては、

  • n に非負整数をセット
  • ループを取得
  • Futureオブジェクトをつくり、それに_callbackをコールバックとして追加
  • ループにrandom_hit関数を future と n を引数としてともに与える

となります。ループの唯一のお仕事はrandom_hitです。これは乱数値を取得し、それが1ならFutureオブジェクトにcountの値をセット、そうでないならcountを+1し、再度同じ条件でrandom_hitをループに仕事して与えます。

特に、random_hitloopが与えられなかったときに自分でasyncio.get_event_loop()によってイベントループを取得しているところに注意してください。イベントループは1スレッドに1つ与えられるので、ここで得られるイベントループは最初に取得したものと同じです。

ええっと。気持ちはわかります。これだと、単純に関数を再帰させるのと何が違うんだって思いますよね。もうひと工夫してみましょう。

# coding=utf-8

import asyncio
import sys
from random import randint


def random_hit(future, n_upper, count=1, loop=None):
    if loop is None:
        loop = asyncio.get_event_loop()
    value = randint(1, n_upper)
    if value == n_upper:
        print("Hit!")  # add
        future.set_result(count)
    else:
        print("Not yet.")  # add
        count += 1
        loop.call_soon(random_hit, future, n_upper, count, loop)


def eternal_hello(loop):  # add
    print("Hello!")
    loop.call_soon(eternal_hello, loop)


def _callback(future):
    print("DONE!")
    loop = asyncio.get_event_loop()

try:
    n = int(sys.argv[1])
except IndexError:
    print("Input an integer.")
    n = int(input())
if n < 1:
    n = 1

loop = asyncio.get_event_loop()
future = loop.create_future()
future.add_done_callback(_callback)
loop.call_soon(random_hit, future, n)
loop.call_soon(eternal_hello, loop)  # add
result = loop.run_until_complete(future)
print(result)
loop.close()

# > python future.py 10
# Not yet.
# Hello!
# Hit!
# Hello!
# DONE!
# Hello!
# 2

あまり大きな値は入力しないように(ハロー地獄のはじまりです)。さっきとの違いは、"Hello!“ と挨拶して、またその仕事をイベントループに押し付けるウルサイやつがいることと、あとは少しprint文を増やしたくらいです。この仕事をrandom_hitの仕事依頼の次の行で頼みました。一応言っておくと、複数の仕事がある場合は先に追加したほうから処理していきます(FIFOってやつですね)。

興味深いことに、単にrandom_hit再帰させるのと違って、まだ Hit! していないにもかかわらず、eternal_helloが挨拶してきました!とはいえ、そんな不思議でもありませんよね。仕事の処理はFIFOで、random_hitの次の仕事は先約のeternal_helloの後に行われるはずですから。

Hit! してから実際にイベントループが停止するまでの挙動を少し見てみましょう。Hello! DONE! Hello! と続き、そして結果が出力されています。特に驚くことでもありませんが、Futureオブジェクトに追加した完了時コールバックは、値が設定されたときに、仕事としてイベントループに押し付けられます。run_until_complete()は内部でloop.stop()を呼ぶといいましたが、これもFutureオブジェクトの完了時コールバックとして呼ばれます。つまり、真相はこうなります:

  • Hit!:このときにFutureオブジェクトは完了し、コールバックをイベントループに仕事として渡す
  • 最初のHello!:コールバックより先約のeternal_helloが挨拶し、また挨拶の仕事を押し付ける
  • DONE!:先約の仕事がすべて終わり、完了時コールバックが実行される
  • 一連の完了時コールバックの最後のタイミングでloop.stop()が呼ばれる。以下は残業。
  • 2番目のところで押し付けられた挨拶の仕事を残業としてこなす。このとき生じた新たな挨拶の仕事は次回実行時に持ち越し。
  • 残業おわり!イベントループは無駄な残業をせずにとっとと帰るのであった。

asyncioモジュールが「イベントループに仕事を依頼していく」という形で動いているというのが分かりますね。

最後の例では、ほんの少しだけですがイベントループの器用さが垣間見れました。でもまだ「仕事を中断して別の仕事に取りかかる」という器用さの真髄はお目にかかれていません。それもそのはず。私たちは未だ、そういう風な処理ができるような形の仕事をループちゃんに与えていないからです。そろそろループちゃんの本領が見れそうですよ!

コルーチンとタスク

お次はコルーチンです。コルーチンはジェネレータを定義するときと同じように、関数定義に近い形で書きます。Python3.5から、コルーチン定義の文法が組み込まれました:

async def hello():
  print("Hello!")

このとき、hello()はコルーチンオブジェクトです。ジェネレータと似てますね*1

コルーチンというのは、まさに イベントループに「待ち時間がどこで発生するか」を教えてあげられる 関数みたいなものです。

ジェネレータと似ているところがもう一つあります。ジェネレータではyield from [generator]によって、別のジェネレータをチェーンすることができましたが、同様にコルーチンはawait [coroutine]によってコルーチンをチェーンすることができます。

コルーチンチェーンとしての役割も大事ですが、awaitのより重要な役割は、await [future]という形式です。つまり、awaitの後にはFutureオブジェクトも置けるわけですが、まさにこの形式こそが イベントループにとっての「待ち時間」の印 になります。

順を追って話しましょう。まず、イベントループにコルーチンを仕事として渡すには、関数のときとは異なる手順を踏みます。コルーチンは、FutureのサブクラスであるTaskオブジェクトにくるまれて、イベントループと関わり合いをもちます。Taskオブジェクトの作り方は2つあります:

  • loop.create_task(coroutine)
  • asyncio.ensure_future(coroutine_or_future)

ensure_future にFutureオブジェクトを渡した場合は、そのオブジェクトがそのまま返ってきます(Taskオブジェクトにはならないことに注意!)。

また、loop.run_until_complete()にはコルーチンを渡すこともできます。そのとき、コルーチンは内部でensure_future()によってTaskオブジェクト化されます。

というわけで、Taskオブジェクトを作るときにはコルーチンを渡します。Taskオブジェクトはこのときに「タスクのステップ」という仕事を(今までやってきたようにcall_soonを使って)イベントループに頼みます。

「タスクのステップ」の仕事内容は次の通りです:まず、コルーチンをawait [Future]にぶつかるまで進めます。コルーチンにFutureオブジェクトへのawaitがなかった場合は、ふつうにコルーチンの最後まで実行され、その返り値がTaskオブジェクトの結果として設定され、完了します(TaskはFutureのサブクラスであることに注意)。もし、Futureオブジェクトへのawaitがあった場合、コルーチンはジェネレータのyieldよろしく、その時点で一旦停止します。そして、そのFutureオブジェクトに完了時コールバックとして「タスクのステップ」を追加します。次回、コルーチンは一旦停止したところからFutureオブジェクトの結果を受け取って再開します。

これが「タスクのステップ」という仕事の概要です。この妙技の仕組みは、もうお分かりになるかと思います。まず、Taskオブジェクトは、Futureオブジェクトの完了時コールバックとして「タスクのステップ」が仕事として依頼されない限り、イベントループが触ることはありません!つまり、処理は先延ばしされる わけです。このTaskオブジェクトの続きは、Futureオブジェクトが完了しないとどうすることもできないわけですから、これは実に理にかなっています。

例として、await対象としてよく使われるasyncio.sleepと似たようなコルーチンを書いてみましょう。このコルーチンは、delay 秒だけ動きを止めたのち、resultを返します:

import asyncio

async def sleep(delay, result=None, *, loop=None):
    if delay == 0:
        return result

    if loop is None:
        loop = asyncio.get_event_loop()
    future = loop.create_future()
    h = loop.call_later(delay, future.set_result, result)
    try:
        return (await future)
    finally:
        h.cancel()

このコルーチンは内部でFutureオブジェクトをつくり、delay秒後にresultをそのFutureオブジェクトにセットするようcall_laterしています。それから、await futureすることで、このコルーチン(のTaskオブジェクト)の処理を future が完了するまでイベントループから遠ざけます。futureの完了はもちろん、delay秒後に完了します!

実際にasyncio.sleepを使ってみましょう。以下の例は割とどこででも見るものです。

# coding=utf-8

import asyncio
import sys
from random import randint


async def boring_counting(id_, n):
    for i in range(n):
        print("eh, I am {}. {} :(".format(id_, i))
        await asyncio.sleep(0.2)
    print("FINISH! by {} :)".format(id_))


def get_input():
    try:
        n = int(sys.argv[1])
    except IndexError:
        print("Input an integer.")
        n = int(input())
    if n < 1:
        n = 1
    return n

n = get_input()
loop = asyncio.get_event_loop()
futures = set()
for i in range(5):
    futures.add(boring_counting(i, n))

loop.run_until_complete(asyncio.wait(futures))
loop.close()

# > python future4.py 3
# eh, I am 0. 0 :(
# eh, I am 1. 0 :(
# eh, I am 2. 0 :(
# eh, I am 3. 0 :(
# eh, I am 4. 0 :(
# eh, I am 0. 1 :(
# eh, I am 2. 1 :(
# eh, I am 4. 1 :(
# eh, I am 1. 1 :(
# eh, I am 3. 1 :(
# eh, I am 0. 2 :(
# eh, I am 4. 2 :(
# eh, I am 3. 2 :(
# eh, I am 2. 2 :(
# eh, I am 1. 2 :(
# FINISH! by 0 :)
# FINISH! by 3 :)
# FINISH! by 1 :)
# FINISH! by 4 :)
# FINISH! by 2 :)

asyncio.sleepがつくったFutureオブジェクトが0.2秒後に完了し、それによってコルーチンが再開します。それがn回続いたあと、コルーチンが終わることでTaskオブジェクトが完了します。ちなみに、同じ秒数のcall_laterが複数あった場合、その順序はよく分かりません。ので、カウント1以降は出力の順番はばらばらになっています。

ところで、なんだか見慣れないloop.run_until_complete(asyncio.wait(futures))によってループを実行していますね。

asyncio.wait と asyncio.gather

asyncio.wait(futures)はコルーチンで、デフォルトでは「引数の中のFutureオブジェクトがすべて完了したら完了する」ようなコルーチンです。ですから、今回の実行は「futuresの中のコルーチンが全部終わるまでループを実行し続ける」という条件になっています。ちなみに、waitは他にも「どれか1つでも完了すれば完了」「例外を誰かが吐いたら完了」という風な条件も設定できます。

asyncio.wait

ちなみに、waitに渡す引数の中のコルーチンは、内部でTaskオブジェクトにくるまれるので、

n = get_input()
loop = asyncio.get_event_loop()
futures = set()
for i in range(5):
    futures.add(boring_counting(i, n))  # tweak

loop.run_until_complete(asyncio.wait(futures))
loop.close()

としても大丈夫です。とはいえ、安全のため ensure_future しておきましょう。

waitは返り値として「完了したFutureオブジェクトの集合」と「未完のFutureオブジェクトの集合」のタプルを返してくれます。ただし、集合ですから、渡したシーケンスの順序は保たれないと思った方がいいでしょう。

渡したシーケンスの順序を保ってほしい場合はasyncio.gatherの出番です。この関数は可変個のFutureオブジェクトをとり、その順番を保ったまま、その 結果 を返してくれます。あ、安心してください!もちろんコルーチンを渡しても、Taskオブジェクトにくるんでくれますよ!結果が欲しい場合はgatherのほうがいいかもしれませんね。

asyncio.gather


さっきのコードを実行して気づいてほしいのは、プログラムが終わるまでに 0.2n 秒程度しかかかってないだろうということです!それぞれのコルーチンは各段階で 0.2秒スリープするはずですから、5*0.2n で n秒はかかりそうな気もします。もちろん、そうはならず、イベントループはasyncio.sleepのところでブロッキングされずに他の仕事を進めることができます。それでも 0.2n秒かかってしまうのは、単に 他にすることがなくなっちゃった から待ってるだけです。これがイベントループの器用さの真髄です!

さて、もう一つ例を出してみますね:

# coding=utf-8

import asyncio
import sys
from random import randint


async def random_hit(n_upper, loop=None):
    if loop is None:
        loop = asyncio.get_event_loop()

    def _callback(future):
        print("DONE!")
    future = loop.create_future()
    future.add_done_callback(_callback)

    def _random_hit(n_upper, count):
        value = randint(1, n_upper)
        if value == n_upper:
            print("Hit!")
            future.set_result(count)
        else:
            print("Not yet.")
            count += 1
            loop.call_soon(_random_hit, n_upper, count)

    loop.call_soon(_random_hit, n_upper, 1)
    return (await future)


def get_input():
    try:
        n = int(sys.argv[1])
    except IndexError:
        print("Input an integer.")
        n = int(input())
    if n < 1:
        n = 1
    return n

n = get_input()
loop = asyncio.get_event_loop()
result = loop.run_until_complete(random_hit(n))
print(result)

# > python future3.py 10
# Not yet.
# Not yet.
# Not yet.
# Not yet.
# Not yet.
# Not yet.
# Not yet.
# Not yet.
# Not yet.
# Hit!
# DONE!
# 10

これは少し前にやったrandom_hitのコルーチン版です。コルーチンの中に関数定義が入り込んで少し複雑になっているような気もしますが、何より見て欲しいのは、ループ実行の前準備の部分です:

n = get_input()
loop = asyncio.get_event_loop()
result = loop.run_until_complete(random_hit(n))

たった3行!もちろん、nの値の設定を関数化したこともありますが、見てほしいのは断然ループとかの方です。イベントループを取得し、random_hit(n)コルーチンが終わるまでループを走らせる。それしかしていません。

何より変わったのは、Futureオブジェクトをコルーチン内部で用意しているというところです。loop.run_until_completeがコルーチンの値を返してくれるので、Futureオブジェクトをこっち側が保持しておく必要がなくなりました。確かに、Futureオブジェクトを内部で用意するために、内部に関数定義を持たなければならないようにもなりましたが、コルーチン使用者側からすればFutureオブジェクトの世話をしなくてよくなり、ぐっと使いやすくなりました。

薄々感じているだろうとは思いますが、asyncioの実際上の基本ユニットはコルーチンです。waitgatherloop.run_until_completeを見ると、いかにも 「コルーチンを使え」 と言ってますよね。

というわけで、ここからは積極的にコルーチンを使っていくことにしましょう。

I/Oに向かって

asyncio.sleepはよくasyncioモジュールにおけるブロッキングの代わりとして用いられますが、実態はただのcall_laterです。なんというか、騙された気分になりませんか? asyncioモジュールは、その名が本当なら非同期I/Oのためのモジュールなはずです。やっぱりI/Oが実際にどうやって非同期になされるのかというのをみないと、このモジュールを理解したことにはならないでしょう。

Pythonで扱えるI/Oは、そりゃたくさん様々にあります。標準I/Oに、ファイルI/Oに、それから、今から嫌なくらいに見ていくプロセス間通信です。asyncioモジュールがターゲットとしているのはどうやらプロセス間通信のようです。それもそのはずで、そもそもasyncioモジュールの成し遂げるシングルスレッドでの非同期な処理形態というのは、レイテンシのある通信を一度にたくさんしたいときに使えるワザだからです。というわけで、以下ではI/Oの中でも、プロセス間通信について見ていきます(ちなみに、標準I/OやファイルI/Oもちょっと細工をしてあげれば、イベントループはブロッキングせずに上手く扱ってくれるようになります。いや、んー、まあ、この場合はイベントループがすごいというよりは、マルチスレッドがすごいということになるんですけど、詳しくは後で話します)。

まずは明かりを灯そう:ソケット

低水準な通信方法にはいくつかありますが、ここではソケットだけを取り上げることにします(他のについてはドキュメントを見てください)。

とはいえ、そもそも、ソケットのノンブロッキングな扱い方といえば、既にselectorを用いた方法があります!イベントループがこれを利用しない手はありませんし、実際そうしています。

イベントループは自前のselectorを持っており、かなり頻繁にそれをチェック(もちろんノンブロッキングで) しています*2。低水準なソケットの操作は、そのselectorにソケットを登録し、読み書き可能になり次第、登録時に渡しておいたコールバック関数で処理するようにします。もちろん、コールバックはそのソケットでの読み書きが可能になり次第、イベントループに仕事として追加されます!

ソケットをループ(のselector)に登録するには次のメソッドを用います:

  • loop.add_reader(fd, callback, *args)
  • loop.remove_reader(fd)
  • loop.add_writer(fd, callback, *args)
  • loop.remove_writer(fd)

fd はファイルディスクリプタですが、ソケットをそのまま渡しても大丈夫です。

肝心の読み書きは次のメソッドで行います。きっとコールバック内で使うことでしょう:

  • loop.sock_recv(sock, nbytes)
  • loop.sock_sendall(sock, data)

ソケットからデータが欲しいようなコルーチンは、Futureオブジェクトを作り、そのFutureオブジェクトをコールバックの引数として渡しておき、コールバックの中で読み出したデータをFutureオブジェクトにセットしてもらえばよさそうです。もちろん、コルーチンはそのFutureオブジェクトを使って await しておきます。

ソケットの接続に関しても、ループのselectorに監視してもらうことができます:

  • loop.sock_connect(sock, address)add_writerのコールバック内で
  • loop.sock_accept(sock)add_readerのコールバック内で

低水準ソケットを使いたいけれど、ソケット自体についてあまり知らないという人は、HOWTOをひとまず読むことをおすすめします。もしその気がないのであれば、低水準においてノンブロッキングI/Oが可能だということを知ってほしかっただけなので、以下は飛ばしてもらっても構いません。

簡単には、クライアントソケット(IPv4, TCP)の場合、

  • ソケットを作る(sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  • ノンブロッキングにする(sock.setblocking(False)
  • ループのselectorに登録(loop.add_writer(sock, callback, *args)
  • コールバック関数中で、ソケットをつなぐ(loop.sock_connect(sock, address)

をするだけで、サーバソケット(IPv4, TCP)の場合は、

  • ソケットを作る(sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  • ノンブロッキングにする(sock.setblocking(False)
  • bind(sock.bind(address, port)
  • listen(sock.listen(backlog)
  • ループのselectorに登録(loop.add_reader(sock, callback, *args)
  • コールバック関数中で、ソケットを受け入れ(loop.sock_accept(sock)

とすればいいでしょう。

補足:その他のソケット種

上ではIPv4アドレスファミリーを用いたTCPタイプのソケットを例に挙げましたが、IPv6Bluetoothアドレスファミリーを用いたり、UDPタイプや生タイプのソケットをつくることもできます。

socketモジュールにあるAF_*定数はアドレスファミリーを表し、SOCK_*定数はソケットタイプを表します。より詳しい内容はドキュメントをどうぞ。

もっと高水準を!:トランスポート/プロトコル

すべてのプロセス間通信はソケットに通ずる……とまでは言いませんが、大抵のプロセス間通信は確かにソケットでできそうです。おさらいすると、「コールバックを持たせてループに預ける」だけ。非常に簡単な方法で非同期なプロセス間通信が実現できたわけですが、接続、読み書き、切断という一連の処理をそれぞれのコールバック関数に書くことになるので、あるソケットの一生を追うのにお目々が行ったり来たり、少し面倒くさそうです。通信の一連の流れ、要はどんなコールバックがそれぞれの段階で呼ばれるのか、というのは一箇所に書いてあったほうが他の誰かにとって、そして未来のあなたにとっても親切でしょう。

というわけで、asyncioモジュールのトランスポート/プロトコルの出番です。プロトコルは、ある通信オブジェクト(≒ソケット)の一連の流れ(接続、読み書き、切断)をメソッドとして記述するためのクラスで、まさに今の私たちが求めているものです。

簡単に説明すると、プロトコルは所定のデータ様式を扱うための処理(所定のデータ様式のアレコレ)を担い、トランスポートは実際のI/Oやバッファリング(I/O現場のアレコレ)を担います。プロトコルファクトリーは各接続ごとにプロトコルインスタンスを新たに作って提供してくれます。

トランスポート

asyncioモジュールは現在 TCP, UDP, SSL, サブプロセスパイプのトランスポートを実装しています。通信において実際に相手とやりとりするのがトランスポートの役目です。各種トランスポートはそれぞれの通信の形態に対応しており、相応のメソッド群を持ち合わせています。

以下にトランスポートを列挙しますが、後で見るように、asyncioモジュールには、基本的なプロトコルがそれぞれのトランスポートを備えたようなものとして用意されているので、基本的にはトランスポートを直接触るようなことはないかと思います。

  • BaseTransport:基底トランスポート
  • ReadTransport:読み込み専用
  • WriteTransport:書き込み専用
  • DatagramTransportUDPに対応
  • BaseSubprocessTransport:サブプロセス通信に対応

プロトコル

asyncioモジュールには、そんな大がかりなプロトコルはなく、それぞれ異なるトランスポートをもつプロトコルとしてサブクラスが発展しているだけです(とはいえ、これらのおかげでトランスポートを直接触らずに済みますから、十分ありがたいものです)。

具体的にどういった様式をデータ単位として扱っていくかということについては、独自にプロトコルを組んでいくことになります。みんなにとって必要なものは恐らく誰かが作っていることでしょう!サードパーティに期待しましょう(たとえば、HTTPについては aiohttpモジュールがあります)。

一番よく使うであろう、Protocolクラスについて見ていきます。さあ、ソケットの一生を書き込みましょう:

  • BaseProtocol.connection_made(transport):コネクション作成時に呼び出し
  • BaseProtocol.connection_lost(exc):コネクションが失われた、閉じられたときに呼び出し
  • Protocol.data_received(data):データを受信したときに呼ばれる。
  • Protocol.eof_received():相手が送信するデータがないことを伝えてきたときに呼ばれる。

その他のプロトコルについてはドキュメントを見てください(プロトコル)。

コネクションの作成

コネクションは用いるソケットタイプやアドレスファミリーによって異なりますが、基本的には:

  • loop.create_connection()SOCK_STREAMTCP)を用いる通信
  • loop.create_datagram_endpoint()SOCK_DGRAMUDP)を用いる通信

を使います。引数には色々と渡せますが、基本的には プロトコルファミリー(あるいは単に、無引数呼び出しでプロトコルインスタンスを返すようなコーラブルオブジェクト)、ホストアドレスポート番号 を順に渡します。

このメソッドはコルーチン関数です。得られるコルーチンは接続を試し、接続が成功すると、プロトコルconnection_madeメソッドが呼ばれ、最終的に(トランスポート, プロトコル)のタプルが返ります。

一方、待ちうけコネクション(サーバー)には次のメソッドを使います。引数は上のメソッドと同じです:

  • loop.create_server

このメソッドもご期待通り、コルーチン関数です。得られるコルーチンは接続が成功するとサーバーオブジェクトを返します。サーバーオブジェクトには次のことができます:

  • server.close():非同期に、待機中のソケットを閉じ、サーバーを停止する
  • server.wait_closed()close()が完了するまで待機する(もちろん非同期に!)
  • sockets:サーバーが待機するソケットのリスト

ドキュメントが気になるなら、ここに置いておくのでご自由に:

流れとしては、プロトコルを渡してコネクションをつくり、あとはプロトコルに書いた通りにソケットが人生(ソケット生)を送るだけです。簡単なやりとり(たとえば、echo)なら、プロトコルの基本メソッドだけで完結すると思います。

データの送信

あ、データの送信について触れていませんでした!データの送信にはトランスポートのメソッドを用います:

  • TCPwrite(data)writelines(list_of_data)write_eof()
  • UDPsendto(data)

「トランスポートには触らないでいいって言ってたのに!」って? それは申し訳ない。でも「直接は触らない」って言ったように、私たちはプロトコル手袋をつけた状態でしかトランスポートを触ることはないはずです。プロトコルconnection_madeはトランスポートを引数として渡されて呼ばれるので、もしそのソケットでデータを送るようなことがあるなら、そのときにトランスポートに首輪でもはめておいてください。えっと、プロトコルインスタンスの属性にでもしておいてくださいってことです。

プロトコルオブジェクトに送信用のメソッドを定義しておくのはいい手かもしれません。特に、渡されたデータに何か小細工してから相手に送りつけたり、その荷物にドラッグが仕込まれてないかを確認したりしたい場合はそうするべきです(ですが、もっと良い方法があって、それはすぐ後で出会うことになります。もう少しだけお待ちを!)。

プロトコルの典型的な実装などについては、標準モジュールのブロッキングプロトコルを参考にするといいかもしれません(インターネットプロトコル)。

ドキュメントの例を載せます:

import asyncio

class EchoClientProtocol(asyncio.Protocol):
    def __init__(self, message, loop):
        self.message = message
        self.loop = loop

    def connection_made(self, transport):
        transport.write(self.message.encode())
        print('Data sent: {!r}'.format(self.message))

    def data_received(self, data):
        print('Data received: {!r}'.format(data.decode()))

    def connection_lost(self, exc):
        print('The server closed the connection')
        print('Stop the event loop')
        self.loop.stop()

loop = asyncio.get_event_loop()
message = 'Hello World!'
coro = loop.create_connection(lambda: EchoClientProtocol(message, loop),
                              '127.0.0.1', 8888)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()

これが、あるソケットの数奇な人生の記録です。接続時に陽気に ‘Hello World!’ と相手に送りつけ、受け取ったデータを単に出力します。なんて退屈な人生なんだ……。接続が切れると、彼はループを止め、やがてプログラムも終了します。

ループを2回に分けて回していることに注意してください。こういうのは今までに見ませんでしたね。loop.run_until_completeは、渡されたFutureオブジェクトに例外が設定されたときはその例外をあげます。2回に分けて実行することで、万が一接続がうまく行かなかったときにループがそのまま永遠に走り続けるというようなことを防止できるわけですね!

続いて、サーバー側も見てみましょう。さっきと同じで、ドキュメントにある例を載せます:

import asyncio

class EchoServerClientProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    def data_received(self, data):
        message = data.decode()
        print('Data received: {!r}'.format(message))

        print('Send: {!r}'.format(message))
        self.transport.write(data)

        print('Close the client socket')
        self.transport.close()

loop = asyncio.get_event_loop()
# Each client connection will create a new protocol instance
coro = loop.create_server(EchoServerClientProtocol, '127.0.0.1', 8888)
server = loop.run_until_complete(coro)

# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

本当に単純なエコーサーバーです。接続すると、接続した相手の名前をプリントします。相手からデータがきたら、それをプリントし、トランスポートを使ってそのまま相手に送り返します(エコーだからね!)。そのあと、トランスポートを閉じ、これ以上このコネクションでやり取りをしない旨を伝えます。ただし、サーバー自体が終わるわけではないことに注意してください!サーバー自体は KeyboardInterrupt があがるまでは動作しつづけます。もし新たにクライアントが接続してきたら、さっきと同じことを繰り返してまた接続を切ります。なんて愛想の悪いコールセンターなんだ!

Ctrl+C によってループが止まれば、サーバーに閉じるよう命じ、ちゃんと閉じるまでループを回してから、ループも閉じます。めでたしめでたし。

※ ん?「Ctrl+C を押しても停止しない」? それは困りました…。もしかして Windowsを使ってませんか? Windowsだと、何も仕事をしていない待機中のイベントループにシグナルが上手く伝わらないようです。イベントループが仕事をしている間に Ctrl+C を押すと上手く反応してくれるので、応急処置としては、絶対に終わらない仕事を与えておく方法があります。あくまで応急処置として、ですけど。

もっともっと高水準を!:ストリーム

トランスポート/プロトコルを用いることで、単純にソケットをイベントループに渡すよりかは断然見通しがよくなりました! ソケットによる他のプロセスとのデータのやり取りはもうバッチリなはずです。でも、確かに本当にI/Oのまわりの装備は整ってきましたが、I/Oとコッチ側のやりとりのことを忘れていました。トランスポート/プロトコルは確かにソケットを繋いで、いい感じに相手からデータを受け取ってくれます。それはいいんですが、そのデータはどうやったら受け取れるんでしょう? 私たちがそのデータを受け取ったのは、そのデータを使って何かしたかったからにちがいありません。

加えて、トランスポートは相手方にバイナリデータを送ってくれますが、トランスポートに渡すために、毎回自分の手で(より抽象的な)データをプロトコルが規定する様式のバイナリデータに変換するのも面倒です。トランスポートが郵便屋さんだとしたら、小包の重さや寸法を調べてくれたり、必要な切手を貼ってくれたりする郵便局の受付係がいたら、とても楽だと思いませんか?

それではご登場願いましょう! ストリームはまさに配達業者と私たちを結ぶ窓口のような役割を果たしてくれます。私たちはStreamReader窓口で「荷物ください」と催促し、StreamWriter窓口に行って「この荷物をお願いします」とお願いすればいいのです。もちろん、各窓口は特定の配達業者と提携していて、私たちの荷物に適切な処置をした上で、その業者に荷物を流してくれます。

有り体に言えば、トランスポート/プロトコルよりも高水準なI/Oシステムです。

トランスポート/プロトコルとストリームの依存関係

ストリームはトランスポート/プロトコルの上になりたつものなので、そこには依存関係があります。簡単にいえば、個々のオブジェクトを作る際の引数に何を渡すのかという関係です。モジュールの構造の関係か、論理上の階層(ストリームがトランスポート/プロトコルより上という階層)と少しだけ変わっています:

  • StreamReader:「最下層」のオブジェクトです。
  • StreamReaderProtocol:ストリーム用にカスタムされたプロトコルで、引数にStreamReaderを取ります。
  • StreamWriter:「最上層」のオブジェクトです。トランスポート、プロトコル、リーダーの3つを引数にとります。

分かるとは思いますが、必要な読み書きストリームをつくる際には

  1. StreamReaderオブジェクトを手に入れる
  2. それを使ってStreamReaderProtocolオブジェクトを手に入れる
  3. それら(+トランスポート)を使ってStreamWriterを手に入れる

という流れになります。とはいえ、StreamReaderだけを作っても特にできることは何もないし、まともに使いたいのであればStreamReaderProtocolは必須です。基本的に、ストリームを使うときはStreamWriterまで作るはずなので、なんであれ、トランスポート/プロトコルは必須です。

StreamReader と StreamReaderProtocol

まずは「受信窓口」から始めましょう。StreamReaderはその内部に倉庫(バッファ(bytearray))をもっていて、StreamReaderProtocolは自分が受け取ったデータをどんどんそこに放り込んでいきます。私たちはStreamReaderのインターフェースを通して倉庫に保管してあるデータを受け取れます。

  • read():引数にバイト数を渡せる。デフォルトではEOFまで読み込む
  • readexactly():引数にバイト数を渡して、厳密にそれだけ読み込む
  • readline():改行がくるまで読み取る。EOFがきた場合は残りをすべて読み込む
  • readuntil()separator引数に渡したバイト文字列までを読み込む

何より大事なのは、これらがコルーチンだということです!これらの読み取りメソッドはバッファに十分なデータがたまるまで、受け渡しを await してくれます。

私たちとStreamReaderを繋ぐのは上で見たread系のメソッドです。配達業者側、つまりトランスポート/プロトコルとはfeed系のメソッドで荷物を受け付けます:

  • feed_data()
  • feed_eof()

これらのメソッドを私たちが直接触ることはないでしょう。StreamReaderProtocolStreamWriterが内部でこれらのメソッドを良しなに使っているので、これらのサブクラスとして実装すればいいだけです。

StreamReaderProtocolインスタンスをつくる際に、第二引数としてclient_connected_cbにコールバック(関数でもコルーチンでも)を渡すことができます。このコールバックは、接続したときに呼ばれるもので、connection_madeメソッドと似た役割をもちます。

StreamWriter

StreamWriter はほとんどトランスポートのラッパです;write(data)writelines(list_of_data)write_eof()メソッドをインターフェースとしてもっており、同名のトランスポートのメソッドを呼び出します。

コネクション

デフォルト(TCP)のStreamReader, StreamWriter, StreamReaderProtocol で十分な場合は、open_connectionが使えます。これはcreate_connectionのラッパーです。実際のソースコード(を若干改変したもの)を見てみます:

async def open_connection(host=None, port=None, *,
                    loop=None, limit=_DEFAULT_LIMIT, **kwds):
    if loop is None:
        loop = events.get_event_loop()
    reader = StreamReader(limit=limit, loop=loop)
    protocol = StreamReaderProtocol(reader, loop=loop)
    transport, _ = await loop.create_connection(
        lambda: protocol, host, port, **kwds)
    writer = StreamWriter(transport, protocol, reader, loop)
    return reader, writer

案の定、まずリーダーを手に入れ、それを使ってプロトコルを手に入れます。それから、create_connection して、トランスポートを手に入れます。最後に、その3つをすべて使って、ライターを手に入れ、リーダーとライターを返します。簡単ですね!

同様に、create_serverのラッパ的存在としてstart_serverもあります:

async def start_server(client_connected_cb, host=None, port=None, *,
                 loop=None, limit=_DEFAULT_LIMIT, **kwds):
    if loop is None:
        loop = events.get_event_loop()

    def factory():
        reader = StreamReader(limit=limit, loop=loop)
        protocol = StreamReaderProtocol(reader, client_connected_cb,
                                        loop=loop)
        return protocol

    return (await loop.create_server(factory, host, port, **kwds))

まあ、やってることは大したことないですよね。

これでだいたい終わり

asyncioモジュールの根幹であるイベントループからはじまり、Futureオブジェクト、コルーチン、トランスポート/プロトコル、ストリームとみてきました。asyncioモジュールの大体の要素は見てきたので、モジュールのドキュメントの大半は見たことあるものになってるかと思います。

サブプロセスだけはやってないですが、ここまで理解していればそんな大したことないでしょう(憶測)。それではレッツエンジョイ asyncio!

あっ、run_in_executor書いてない・・・・・・

*1:実際、Python3.4以前ではコルーチンはジェネレータの形で書いていましたから、当然といえば当然ですね。

*2:具体的にどのくらい頻繁かというと、新しい仕事を始める前に毎回チェックしているようです。