ゆくゆくは有へと

おかゆ/オカ∃/大鹿有生/彼ノ∅有生 の雑記

Rustのmoduleの話

`mod` and the Filesystem - The Rust Programming Language

mod client {            //
    fn connect(){}      // -> client.rs
}                       //

mod network {           //
    fn connect(){}      //  ------ -> network/mod.rs
                        // |
    mod server {        // |
        fn connect(){}  // | -> network/server.rs
    }                   //  ------
}                       //

Rustのメモ

“ownership” について

「所有権」と訳されているけれど、"ownership" のニュアンスを完全に捉えきれてるのかよく分からない。

英辞郎を引いてみると

  1. 所有権、所有、所有者[持ち主]であること
  2. 責任感、当事者意識

とあり、1の意義はOKとして、2の意義って「所有権」って語から察せるものなんだろうか?おかゆが単に世間知らずで、「所有権」にそういうニュアンスを感じることができないだけ?

“ownership” で大事なニュアンスはむしろ 2 のほうなんじゃないかって理解が浅いながら思うのだけど、つまり、「メモリ解放の責任をもつ」ってことかなって。ムーブは「責任の押し付け」で、借用はその責任を被らずに値を使わせていただくことのできる仕組みということかなって。

特に訳さず、「オーナーシップ」でいいような気もする。「ムーブ」もムーブだし。

IntoIterator

Python だと Iterable で、for .. in .. にぶち込むと勝手に iter してくれてるのが懐かしく思える。

このあたりのムーブセマンティクスの挙動は勉強しないとなあ

ライフタイム

どうでもいいんだけど、"ownership" は訳して、"lifetime" は「ライフタイム」なんやね。

ライフタイムよく分かんね~~

ライフタイムより:

struct Foo<'a> {
    x: &'a i32,
}

fn main() {
    let x;                    // -+ xがスコープに入る
                              //  |
    {                         //  |
        let y = &5;           // ---+ yがスコープに入る
        let f = Foo { x: y }; // ---+ fがスコープに入る
        x = &f.x;             //  | | ここでエラーが起きる
    }                         // ---+ fとyがスコープから出る
                              //  |
    println!("{}", x);        //  |
}                             // -+ xがスコープから出る

エラーが起きるのは分かるけど、ライフタイム明記せなコンパイラちゃんは認識できんの?と思ってライフタイム指定消してみよ:

struct Foo {
    x: &i32,
}

fn main() {
    let x;                    // -+ xがスコープに入る
                              //  |
    {                         //  |
        let y = &5;           // ---+ yがスコープに入る
        let f = Foo { x: y }; // ---+ fがスコープに入る
        x = &f.x;             //  | | ここでエラーが起きる
    }                         // ---+ fとyがスコープから出る
                              //  |
    println!("{}", x);        //  |
}                             // -+ xがスコープから出る

にしてみたら、

error[E0106]: missing lifetime specifier
 --> src\main.rs:2:8
  |
2 |     x: &i32,
  |        ^ expected lifetime parameter

error: aborting due to previous error
error: Could not compile `practice`.

あ、なるほど、指定子ないぞ!って怒られるのか。

内部構造にどっかの参照(借用)をもってるから、ってことかしら…。まあそうだろうけど…。

コンパイラちゃんはどこを見て指定子が必要なところかどうかを見極めるんだろ?
「内部構造に借用の型を持ってたら」かな?「外側(ガワ)にライフタイムを伝播」させるために、ライフタイム指定子が必要?

関数アノテーションでのライフタイム省略については、「ライフタイムの関わる出力は、大抵入力のライフタイムと関連あるやろ(鼻ホジ」ってことでしょう…。

“mutablity” について

ミュータビリティ

一方で、Rustで「イミュータブル(immutable)」について言及するとき、変更不可能であることを意味しない: 「外側のミュータビリティ(exterior mutability)」を表します。例として、Arc を考えます:

うーん。原文は

However, when we say something is ‘immutable’ in Rust, that doesn’t mean that it’s not able to be changed: we are referring to its ‘exterior mutability’ that in this case is immutable. Consider, for example, Arc:

「しかしながら、Rustで私たちが「あるモノがイミュータブル(immutable)だ」と言うとき、別にそれが変化し得ないと言っているわけではありません。そうではなく、「外側のミュータビリティ」に関してイミュータブルだと言っているわけです。」

これくらいの意味だろう…。

主成分分析に関するメモ

多変量解析入門(C・チャットフィールド, A・J・コリンズ)4.4 p.69あたり

「例えば、1つの変数がすべての他の変数よりもずっと大きい分散を持っていると、相関構造がどのようなものであっても、その変数が共分散行列の第1主成分の中で際立った重みを持つことになる。反面、すべての変数が単位分散を持つように尺度化されていると、第1主成分は全く違った性質を示すであろう。このような事情から、一般論として、例えばすべての変数が百分率である場合とか、すべて同一の座標系で測られている場合とかのように、すべての変数が「大体似通った」分散を持っているのでなければ、PCAを実行することはほとんど意味がないと言えよう。」

「この尺度構成の問題を処理するために、通常共分散行列ではなく”相関”行列を分析する方法が行われている。…(中略)…。この返還によってすべての変数が単位分散を持つように尺度化され、一応重要度に差異がなくなる。しかしこの尺度構成の手順は、ある程度任意性を持っており、データいかんに左右され、しかも尺度化の問題を解決するものというよりも回避するためのものといえる。すべての変数が同等に重要であると考えられない場合には、相関行列の分析はやらないほうがよい。また相関行列を分析した場合、2つ以上の標本についてのPCAの結果の比較がより困難になる。」

「相関行列の主成分は、変数をもとの座標系に変換しなおすと直交しなくなるであろう。これはp空間で直交する2直線の線形変換が、一般には新しい直交する2直線を与えないからであって、これも尺度構成がなによりも重要であることを示唆している。」

「成分は変数の線形変換の不変量ではない。」

化学者のための多変量解析(尾崎幸洋ほか)

3.3.2 p.51あたり

「旧来の定量化法においては、それぞれの成分に対して代表的なピークを選定し、その吸収強度の時系列変化と対応する濃度推移(目的変量)を使って検量を行う。また各成分に特徴的なピークを複数個選択し、MLRを用いて検量を行うこともできるが、着目した波長以外での情報の欠落や、選択したピークと他のピークとのオーバーラップなど、しばしばやっかいな問題に遭遇する。PLSではPCRの場合と同様に、全波長のスペクトルあるいは一定波長域のスペクトルを用いて回帰を行う。PLSの特徴は、説明変量X側と目的変量Y側の双方に主成分分析(PCA)による直交分解を適用することにある。」

※ PLS: Partial Least Squares

簡単には、多次元の説明変量で、多次元の目的変量を説明(回帰)する1つの方法で、それぞれの空間で主成分軸を決定してしまって、それぞれの主成分スコアを座標として回帰を行う方法らしい。

(いちほ)

Rインスコした

環境

Win10 64bit

R言語インストール(2017年 Windows) - Qiita

死ぬな

ああどうか お願い 死なないで
この季節は ぼくの死んだ季節で
君の死ぬ季節じゃない
君はもっと辺りが死に満ちた
ふさわしい時に死ぬべきだ
こんなみんなが盛り上がってる
そんなときに死ぬんじゃない
そこで僕は死んだけど
君が死ぬ道理はさ
これっぽっちもないんだって
ねえ だからお願い 死なないで
あなたはまだ生きていて
繰り返す季節の中で
僕は死んだ僕のことと
そして死んだ君のことを
思い出さなくちゃいけなくなって
ほらみろ とても忙しくなるだろう
だからお願い 死なないで

Pythonの非同期通信(asyncioモジュール)入門を書きました

はじめに

非同期処理のことから知らない人向けにPythonくらいしかろくに知らない人間が書きました。せっかくキーワードが文法に組み込まれたんだから理解したいじゃんか!

asyncioモジュールを使うための基本的な概念が理解できるようになってるはず、多分。

環境としては Python3.5 以上を想定しています。つまり、awaitasync キーワードを使っていきます。

それから、関数やメソッドの仮引数は全く書いてません。必要最低限は文中で説明していますが、より完全に知りたい人は適宜ドキュメントの参照をお願いします。

主役はループちゃん

asyncioの主役はイベントループです。イベントループは頼まれた仕事を順番にどんどん処理していくデキるクールガイです。

本質的に、私たちはイベントループに仕事を関数オブジェクトの形で与えていくだけです。asyncioモジュールの大部分は、私たちが仕事を関数以外の形でイベントループに与えるためにあります(もちろん内部では関数としてイベントループに仕事を与えます)。後々やりますが、ほとんどの場合、私たちはコルーチンという形で仕事を定義し、それを加工してもらいます。

さて、その肝心のイベントループは

loop = asyncio.get_event_loop()

で手に入れます。

仕事の依頼

では、あまりに素朴ですが、関数オブジェクトとして仕事を与える方法は2つあります:

  • call_soonメソッド
  • call_laterメソッド

loop.call_soon(callback, *args) は、イベントループに callback という関数なる仕事をその引数 args とともに渡します。

仕事の開始時間を決めることもできます。loop.call_later(time, callback, *args) は、今から time 秒後の時点に callback を呼ぶようにイベントループにお願いします。ただし、それ以前の仕事に時間がかかっている場合は、この仕事に手を付けるのは開始時間以降になるので注意です。

イベントループの実行

色々とイベントループに仕事を頼みこんだあとは、イベントループを実行して仕事を処理していってもらいましょう!

loop.run_forever()

によって、ループを永遠に実行します。

ただし、永遠に実行されると困るので、なんらかの形でイベントループを止めるようにしましょう。イベントループの停止は loop.stop() で行えます。loop.stop()が呼ばれると、イベントループはその時点に積んである仕事をすべて終わらせてから停止します。ただし、その仕事をこなす途中で新たに追加された仕事は、次回の実行のときまで延期されます。残業中に出てきた新しい仕事は、次の出社日にやるってことですね。

というわけで、

  • loop.call_soon(loop.stop) で最後の仕事としてループの停止をさせる
  • 最後の仕事として渡した関数の中身でloop.stop()を呼ぶ

などをしてループに退社命令を出してあげましょう。停止したループは

loop.close()

で閉じておきましょう。

ドキュメントにある例を少し、ほんの少しだけ変えたものです:

import asyncio
import datetime


def display_date(end_time, loop):
    print(datetime.datetime.now())
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, display_date, end_time, loop)
    else:
        loop.call_soon(loop.stop) # 単に loop.stop() でもいい

loop = asyncio.get_event_loop()

# Schedule the first call to display_date()
end_time = loop.time() + 5.0
loop.call_soon(display_date, end_time, loop)

# Blocking call interrupted by loop.stop()
loop.run_forever()
loop.close()

display_dateが2つの仕事をしています:

  • 今の時刻の表示
  • 次の仕事の追加(もう一回 display_date をさせるか、停止させるか)

ループを走らせる時点では、ループには1つの仕事しか与えていませんが、その仕事の中で次の仕事をループに与えているという構図です。

こういう関数を渡すのもありかもしれません:

def display_date2(loop):
    print(datetime.datetime.now())
    try:
        loop.call_later(1, display_date, end_time, loop)
    except KeyboardInterrupt:
        loop.stop()

Ctrl + C でループを邪魔しない限り、永遠にループは時刻を表示しつづけます。

仕事のキャンセル

call_sooncall_laterメソッドは Handleクラスのインスタンスを返します。

その仕事をキャンセルしたい場合は、そのハンドルのcancel()メソッドが使えます。


asyncio を使うにあたって、私たちがすることは

  1. イベントループの取得(loop = asyncio.get_event_loop()
  2. イベントループへのスケジューリング(loop.call_soon(callback)とか)
  3. イベントループの実行(loop.run_forever()など)
  4. イベントループの停止(loop.stop()
  5. イベントループを閉じる(loop.close()

ということです。

なぜわざわざループちゃんに仕事をさせるのか

ここまでにしたことだけを見れば、私たちがふつうに関数を順次実行していくようにコードを書くのと何ら変わりません。call_laterだって、time.sleep()でその時間だけ待ってから次の関数を呼び出せばいいだけじゃないですか?

私たちがやりたいことをループに仕事としてやらせる最大の理由は ループは私たちがコードを順次実行するよりも器用に仕事をしてくれる からです。器用って?

というのも、ループはブロッキングI/Oについて上手くこなす術を知っているんです。

I/Oはともかくとして、ブロッキングというのは日常生活でもよく遭遇し、私たちをイライラさせていることでしょう。もしあなたが何かを待っている間、待つこと以外にろくなことができないとき、あなたはブロッキングされています!市役所の長い長い処理時間の間、あなたにできることと言えば鋭い眼差しで事務員にプレッシャーを与えることくらいでしょう。

でも、もし、処理が終わったらケータイに連絡をくれると言ってくれたらどうでしょう?その間にあなたは今日発売のお気に入りの雑誌を買いに書店へ行き、今月分の家賃を振込み、家になかった文房具を調達し、懸賞ハガキをポストに入れ、喫茶店でコーヒーを飲みながらasyncioに関する記事を書き、……。そう、実に待ち時間を有意義に過ごすことができたことでしょう!

いや、ケータイに連絡をくれなくっても、せめて受付番号を渡してくれれば、仕事を1つこなしては市役所に行って自分の処理が終わってないか確認し、まだなら他のことをしにいくというようなことができますよね。いちいち市役所に行かないといけないのが面倒ですが、やるべきことが市役所のごく近所で済むようなら、まあ十分でしょう。

イベントループは、まさにI/O(と一部の事柄)に関してこのような器用なことができるのです。時間のかかるI/Oの前でいつまでも突っ立ってないで、イベントループはそのチェックを他の仕事の後に回し、次の仕事に取りかかります。

調べたところによると、あえて区別するのであれば、ケータイに連絡してくれるような方式を「非同期I/O」、受付番号をもらって度々確認しにいくような方式を「ノンブロッキングI/O」と呼ぶようです。ちなみに最高にストレスのたまる例のアレは「同期I/O」や「ブロッキングI/O」と呼びます。

asyncioモジュールのイベントループは基本的にはノンブロッキングに仕事をこなします。ある場合は非同期的にも振る舞いますが、いずれにせよイベントループは仕事を無駄なく処理しようとしてくれます。

ループちゃんのスキルを生かす

今までのやり方だとループちゃんの器用さがまるで生かされてないのは明らかです。今までの、関数を仕事として渡すという素朴なやりかたではループちゃんは「空き時間」を上手く見つけられないのです。そこで、言わずもがなですが、次にループちゃんが器用に動き回れるような仕事の作り方を学びましょう。

…と、その準備として、ちょっと不思議なオブジェクト、Futureオブジェクトについて学びましょう。

Future

Futureオブジェクトは高レイヤーな並列処理モジュール concurrent.futures にもあるオブジェクトです。

Futureオブジェクトは、簡単にいえば 結果の代わりとなる オブジェクトです。ある値を渡すような処理があったとして、送信側はとりあえずFutureオブジェクトを受信側に渡し、送信側は処理が終わり次第そのFutureオブジェクトに値を放り込みます。受信側は、Futureオブジェクトの状態が完了になり次第、値を取り出して続きの処理に使うことができます。

イベントループの器用さは確かにこのFutureオブジェクトに因るところが大きいのですが、これについてはもう少し後で見ていきましょう。

さて、Futureオブジェクトに対してできることは、大まかに4つです:

  1. 状態の確認(done, cancelled)
  2. 値の取り出し(result, exception)
  3. 完了(値の設定・キャンセル)(set_result, set_exception, cancel)
  4. コールバックの設定(add_done_callback, remove_done_callback)

Futureインスタンスにとって “done” とは、結果か例外が設定されたことを意味します。コールバックはFutureオブジェクトが完了したときに呼び出されます。

Futureオブジェクトがキャンセルされると、cancelled 属性が True になり、コールバックも呼ばれません。

Futureオブジェクトをより輝かしいものとしてくれているのは、完了時コールバックの機能です!コールバックは、Futureオブジェクトが完了したときに呼ばれる処理です。このコールバックにはFutureオブジェクト自身のみを引数として取るような関数(あるいは一般にそのようなコーラブルオブジェクト)を指定します。

デフォルトなFutureオブジェクトはloop.create_future()で作ります。Futureオブジェクトを作った後、あるいは誰かから渡された後、コールバックを設定してお好みのFutureオブジェクトを作り上げましょう!

Futureオブジェクトを利用した上手い方法は2つあります。ひとつは、定期的にFutureオブジェクトの様子を見てもらうようにイベントループに仕事を頼む方法です。まだ完了していないようであれば、同じ仕事をcall_soonして、しばらくしてからまたイベントループに見てもらうようにします。

もうひとつは、恐らく分かっているとは思いますが、完了時コールバックを使う方法です。Futureオブジェクトに、Futureオブジェクトが完了して以降にしたかった仕事内容をコールバック関数として渡しておきます。そうするとFutureオブジェクトは自分に値が設定されたときに、イベントループにその仕事を頼みます。まさにケータイに連絡するように!イベントループはその仕事をスケジュールに組み込んで、めでたしめでたし。

完了時コールバックを用いた方が断然スタイリッシュにみえますが、前者の方でしかできないこともあります。2つの方法を使い分けられるようにしましょう。

Futureオブジェクトを利用した便利な機能のひとつが、イベントループの実行の仕方のひとつ、loop.run_until_complete(future)です。loop.run_forever()と異なり、引数のFutureオブジェクトが完了するとイベントループが停止します。

似たような挙動のものが自前でも作れそうです。Futureオブジェクトが完了したときにループを停止させればいいんですから、Futureオブジェクトにそのようなコールバックを与えてやればいいですよね:

loop = asyncio.get_event_loop()
future = loop.create_future()
future.add_done_callback(loop.stop)
... # 準備
loop.run_forever()

実は、loop.run_until_complete()にはまだ隠された機能があるのですが、これは後々明らかになるでしょう。ですから、上のコードはこのメソッドの完全版ではありません。残念!いや、むしろこんなちんけな実装じゃなくて安心?

まずは簡単な例から:

# coding=utf-8

import asyncio


def set_result_to_future(value, future):
    if not future.done():
        future.set_result(value)

loop = asyncio.get_event_loop()
future = loop.create_future()
loop.call_soon(set_result_to_future, "DONE!", future)
result = loop.run_until_complete(future)
assert(future.result() == result)
print(result)
loop.close()

あ、実はloop.run_until_complete(future)は完了したFutureオブジェクトの結果の値、すなわちfuture.result()を返してくれます。もちろん今回の場合だと、Futureオブジェクトは手元にあるので、そこから結果を取り出しても構いません。

あんまり面白みがないので、もう少し凝らしてみましょう:

# coding=utf-8

import asyncio
import sys
from random import randint


def random_hit(future, n_upper, count=1, loop=None):
    if loop is None:
        loop = asyncio.get_event_loop()
    value = randint(1, n_upper)
    if value == 1:
        future.set_result(count)
    else:
        count += 1
        loop.call_soon(random_hit, future, n_upper, count, loop)


def _callback(future):
    print("DONE!")

try:
    n = int(sys.argv[1])
except IndexError:
    print("Input an integer.")
    n = int(input())
if n < 1:
  n = 1
loop = asyncio.get_event_loop()
future = loop.create_future()
future.add_done_callback(_callback)
loop.call_soon(random_hit, future, n)
result = loop.run_until_complete(future)
print(result)
loop.close()

準備としては、

  • n に非負整数をセット
  • ループを取得
  • Futureオブジェクトをつくり、それに_callbackをコールバックとして追加
  • ループにrandom_hit関数を future と n を引数としてともに与える

となります。ループの唯一のお仕事はrandom_hitです。これは乱数値を取得し、それが1ならFutureオブジェクトにcountの値をセット、そうでないならcountを+1し、再度同じ条件でrandom_hitをループに仕事して与えます。

特に、random_hitloopが与えられなかったときに自分でasyncio.get_event_loop()によってイベントループを取得しているところに注意してください。イベントループは1スレッドに1つ与えられるので、ここで得られるイベントループは最初に取得したものと同じです。

ええっと。気持ちはわかります。これだと、単純に関数を再帰させるのと何が違うんだって思いますよね。もうひと工夫してみましょう。

# coding=utf-8

import asyncio
import sys
from random import randint


def random_hit(future, n_upper, count=1, loop=None):
    if loop is None:
        loop = asyncio.get_event_loop()
    value = randint(1, n_upper)
    if value == n_upper:
        print("Hit!")  # add
        future.set_result(count)
    else:
        print("Not yet.")  # add
        count += 1
        loop.call_soon(random_hit, future, n_upper, count, loop)


def eternal_hello(loop):  # add
    print("Hello!")
    loop.call_soon(eternal_hello, loop)


def _callback(future):
    print("DONE!")
    loop = asyncio.get_event_loop()

try:
    n = int(sys.argv[1])
except IndexError:
    print("Input an integer.")
    n = int(input())
if n < 1:
    n = 1

loop = asyncio.get_event_loop()
future = loop.create_future()
future.add_done_callback(_callback)
loop.call_soon(random_hit, future, n)
loop.call_soon(eternal_hello, loop)  # add
result = loop.run_until_complete(future)
print(result)
loop.close()

# > python future.py 10
# Not yet.
# Hello!
# Hit!
# Hello!
# DONE!
# Hello!
# 2

あまり大きな値は入力しないように(ハロー地獄のはじまりです)。さっきとの違いは、"Hello!“ と挨拶して、またその仕事をイベントループに押し付けるウルサイやつがいることと、あとは少しprint文を増やしたくらいです。この仕事をrandom_hitの仕事依頼の次の行で頼みました。一応言っておくと、複数の仕事がある場合は先に追加したほうから処理していきます(FIFOってやつですね)。

興味深いことに、単にrandom_hit再帰させるのと違って、まだ Hit! していないにもかかわらず、eternal_helloが挨拶してきました!とはいえ、そんな不思議でもありませんよね。仕事の処理はFIFOで、random_hitの次の仕事は先約のeternal_helloの後に行われるはずですから。

Hit! してから実際にイベントループが停止するまでの挙動を少し見てみましょう。Hello! DONE! Hello! と続き、そして結果が出力されています。特に驚くことでもありませんが、Futureオブジェクトに追加した完了時コールバックは、値が設定されたときに、仕事としてイベントループに押し付けられます。run_until_complete()は内部でloop.stop()を呼ぶといいましたが、これもFutureオブジェクトの完了時コールバックとして呼ばれます。つまり、真相はこうなります:

  • Hit!:このときにFutureオブジェクトは完了し、コールバックをイベントループに仕事として渡す
  • 最初のHello!:コールバックより先約のeternal_helloが挨拶し、また挨拶の仕事を押し付ける
  • DONE!:先約の仕事がすべて終わり、完了時コールバックが実行される
  • 一連の完了時コールバックの最後のタイミングでloop.stop()が呼ばれる。以下は残業。
  • 2番目のところで押し付けられた挨拶の仕事を残業としてこなす。このとき生じた新たな挨拶の仕事は次回実行時に持ち越し。
  • 残業おわり!イベントループは無駄な残業をせずにとっとと帰るのであった。

asyncioモジュールが「イベントループに仕事を依頼していく」という形で動いているというのが分かりますね。

最後の例では、ほんの少しだけですがイベントループの器用さが垣間見れました。でもまだ「仕事を中断して別の仕事に取りかかる」という器用さの真髄はお目にかかれていません。それもそのはず。私たちは未だ、そういう風な処理ができるような形の仕事をループちゃんに与えていないからです。そろそろループちゃんの本領が見れそうですよ!

コルーチンとタスク

お次はコルーチンです。コルーチンはジェネレータを定義するときと同じように、関数定義に近い形で書きます。Python3.5から、コルーチン定義の文法が組み込まれました:

async def hello():
  print("Hello!")

このとき、hello()はコルーチンオブジェクトです。ジェネレータと似てますね*1

コルーチンというのは、まさに イベントループに「待ち時間がどこで発生するか」を教えてあげられる 関数みたいなものです。

ジェネレータと似ているところがもう一つあります。ジェネレータではyield from [generator]によって、別のジェネレータをチェーンすることができましたが、同様にコルーチンはawait [coroutine]によってコルーチンをチェーンすることができます。

コルーチンチェーンとしての役割も大事ですが、awaitのより重要な役割は、await [future]という形式です。つまり、awaitの後にはFutureオブジェクトも置けるわけですが、まさにこの形式こそが イベントループにとっての「待ち時間」の印 になります。

順を追って話しましょう。まず、イベントループにコルーチンを仕事として渡すには、関数のときとは異なる手順を踏みます。コルーチンは、FutureのサブクラスであるTaskオブジェクトにくるまれて、イベントループと関わり合いをもちます。Taskオブジェクトの作り方は2つあります:

  • loop.create_task(coroutine)
  • asyncio.ensure_future(coroutine_or_future)

ensure_future にFutureオブジェクトを渡した場合は、そのオブジェクトがそのまま返ってきます(Taskオブジェクトにはならないことに注意!)。

また、loop.run_until_complete()にはコルーチンを渡すこともできます。そのとき、コルーチンは内部でensure_future()によってTaskオブジェクト化されます。

というわけで、Taskオブジェクトを作るときにはコルーチンを渡します。Taskオブジェクトはこのときに「タスクのステップ」という仕事を(今までやってきたようにcall_soonを使って)イベントループに頼みます。

「タスクのステップ」の仕事内容は次の通りです:まず、コルーチンをawait [Future]にぶつかるまで進めます。コルーチンにFutureオブジェクトへのawaitがなかった場合は、ふつうにコルーチンの最後まで実行され、その返り値がTaskオブジェクトの結果として設定され、完了します(TaskはFutureのサブクラスであることに注意)。もし、Futureオブジェクトへのawaitがあった場合、コルーチンはジェネレータのyieldよろしく、その時点で一旦停止します。そして、そのFutureオブジェクトに完了時コールバックとして「タスクのステップ」を追加します。次回、コルーチンは一旦停止したところからFutureオブジェクトの結果を受け取って再開します。

これが「タスクのステップ」という仕事の概要です。この妙技の仕組みは、もうお分かりになるかと思います。まず、Taskオブジェクトは、Futureオブジェクトの完了時コールバックとして「タスクのステップ」が仕事として依頼されない限り、イベントループが触ることはありません!つまり、処理は先延ばしされる わけです。このTaskオブジェクトの続きは、Futureオブジェクトが完了しないとどうすることもできないわけですから、これは実に理にかなっています。

例として、await対象としてよく使われるasyncio.sleepと似たようなコルーチンを書いてみましょう。このコルーチンは、delay 秒だけ動きを止めたのち、resultを返します:

import asyncio

async def sleep(delay, result=None, *, loop=None):
    if delay == 0:
        return result

    if loop is None:
        loop = asyncio.get_event_loop()
    future = loop.create_future()
    h = loop.call_later(delay, future.set_result, result)
    try:
        return (await future)
    finally:
        h.cancel()

このコルーチンは内部でFutureオブジェクトをつくり、delay秒後にresultをそのFutureオブジェクトにセットするようcall_laterしています。それから、await futureすることで、このコルーチン(のTaskオブジェクト)の処理を future が完了するまでイベントループから遠ざけます。futureの完了はもちろん、delay秒後に完了します!

実際にasyncio.sleepを使ってみましょう。以下の例は割とどこででも見るものです。

# coding=utf-8

import asyncio
import sys
from random import randint


async def boring_counting(id_, n):
    for i in range(n):
        print("eh, I am {}. {} :(".format(id_, i))
        await asyncio.sleep(0.2)
    print("FINISH! by {} :)".format(id_))


def get_input():
    try:
        n = int(sys.argv[1])
    except IndexError:
        print("Input an integer.")
        n = int(input())
    if n < 1:
        n = 1
    return n

n = get_input()
loop = asyncio.get_event_loop()
futures = set()
for i in range(5):
    futures.add(boring_counting(i, n))

loop.run_until_complete(asyncio.wait(futures))
loop.close()

# > python future4.py 3
# eh, I am 0. 0 :(
# eh, I am 1. 0 :(
# eh, I am 2. 0 :(
# eh, I am 3. 0 :(
# eh, I am 4. 0 :(
# eh, I am 0. 1 :(
# eh, I am 2. 1 :(
# eh, I am 4. 1 :(
# eh, I am 1. 1 :(
# eh, I am 3. 1 :(
# eh, I am 0. 2 :(
# eh, I am 4. 2 :(
# eh, I am 3. 2 :(
# eh, I am 2. 2 :(
# eh, I am 1. 2 :(
# FINISH! by 0 :)
# FINISH! by 3 :)
# FINISH! by 1 :)
# FINISH! by 4 :)
# FINISH! by 2 :)

asyncio.sleepがつくったFutureオブジェクトが0.2秒後に完了し、それによってコルーチンが再開します。それがn回続いたあと、コルーチンが終わることでTaskオブジェクトが完了します。ちなみに、同じ秒数のcall_laterが複数あった場合、その順序はよく分かりません。ので、カウント1以降は出力の順番はばらばらになっています。

ところで、なんだか見慣れないloop.run_until_complete(asyncio.wait(futures))によってループを実行していますね。

asyncio.wait と asyncio.gather

asyncio.wait(futures)はコルーチンで、デフォルトでは「引数の中のFutureオブジェクトがすべて完了したら完了する」ようなコルーチンです。ですから、今回の実行は「futuresの中のコルーチンが全部終わるまでループを実行し続ける」という条件になっています。ちなみに、waitは他にも「どれか1つでも完了すれば完了」「例外を誰かが吐いたら完了」という風な条件も設定できます。

asyncio.wait

ちなみに、waitに渡す引数の中のコルーチンは、内部でTaskオブジェクトにくるまれるので、

n = get_input()
loop = asyncio.get_event_loop()
futures = set()
for i in range(5):
    futures.add(boring_counting(i, n))  # tweak

loop.run_until_complete(asyncio.wait(futures))
loop.close()

としても大丈夫です。とはいえ、安全のため ensure_future しておきましょう。

waitは返り値として「完了したFutureオブジェクトの集合」と「未完のFutureオブジェクトの集合」のタプルを返してくれます。ただし、集合ですから、渡したシーケンスの順序は保たれないと思った方がいいでしょう。

渡したシーケンスの順序を保ってほしい場合はasyncio.gatherの出番です。この関数は可変個のFutureオブジェクトをとり、その順番を保ったまま、その 結果 を返してくれます。あ、安心してください!もちろんコルーチンを渡しても、Taskオブジェクトにくるんでくれますよ!結果が欲しい場合はgatherのほうがいいかもしれませんね。

asyncio.gather


さっきのコードを実行して気づいてほしいのは、プログラムが終わるまでに 0.2n 秒程度しかかかってないだろうということです!それぞれのコルーチンは各段階で 0.2秒スリープするはずですから、5*0.2n で n秒はかかりそうな気もします。もちろん、そうはならず、イベントループはasyncio.sleepのところでブロッキングされずに他の仕事を進めることができます。それでも 0.2n秒かかってしまうのは、単に 他にすることがなくなっちゃった から待ってるだけです。これがイベントループの器用さの真髄です!

さて、もう一つ例を出してみますね:

# coding=utf-8

import asyncio
import sys
from random import randint


async def random_hit(n_upper, loop=None):
    if loop is None:
        loop = asyncio.get_event_loop()

    def _callback(future):
        print("DONE!")
    future = loop.create_future()
    future.add_done_callback(_callback)

    def _random_hit(n_upper, count):
        value = randint(1, n_upper)
        if value == n_upper:
            print("Hit!")
            future.set_result(count)
        else:
            print("Not yet.")
            count += 1
            loop.call_soon(_random_hit, n_upper, count)

    loop.call_soon(_random_hit, n_upper, 1)
    return (await future)


def get_input():
    try:
        n = int(sys.argv[1])
    except IndexError:
        print("Input an integer.")
        n = int(input())
    if n < 1:
        n = 1
    return n

n = get_input()
loop = asyncio.get_event_loop()
result = loop.run_until_complete(random_hit(n))
print(result)

# > python future3.py 10
# Not yet.
# Not yet.
# Not yet.
# Not yet.
# Not yet.
# Not yet.
# Not yet.
# Not yet.
# Not yet.
# Hit!
# DONE!
# 10

これは少し前にやったrandom_hitのコルーチン版です。コルーチンの中に関数定義が入り込んで少し複雑になっているような気もしますが、何より見て欲しいのは、ループ実行の前準備の部分です:

n = get_input()
loop = asyncio.get_event_loop()
result = loop.run_until_complete(random_hit(n))

たった3行!もちろん、nの値の設定を関数化したこともありますが、見てほしいのは断然ループとかの方です。イベントループを取得し、random_hit(n)コルーチンが終わるまでループを走らせる。それしかしていません。

何より変わったのは、Futureオブジェクトをコルーチン内部で用意しているというところです。loop.run_until_completeがコルーチンの値を返してくれるので、Futureオブジェクトをこっち側が保持しておく必要がなくなりました。確かに、Futureオブジェクトを内部で用意するために、内部に関数定義を持たなければならないようにもなりましたが、コルーチン使用者側からすればFutureオブジェクトの世話をしなくてよくなり、ぐっと使いやすくなりました。

薄々感じているだろうとは思いますが、asyncioの実際上の基本ユニットはコルーチンです。waitgatherloop.run_until_completeを見ると、いかにも 「コルーチンを使え」 と言ってますよね。

というわけで、ここからは積極的にコルーチンを使っていくことにしましょう。

I/Oに向かって

asyncio.sleepはよくasyncioモジュールにおけるブロッキングの代わりとして用いられますが、実態はただのcall_laterです。なんというか、騙された気分になりませんか? asyncioモジュールは、その名が本当なら非同期I/Oのためのモジュールなはずです。やっぱりI/Oが実際にどうやって非同期になされるのかというのをみないと、このモジュールを理解したことにはならないでしょう。

Pythonで扱えるI/Oは、そりゃたくさん様々にあります。標準I/Oに、ファイルI/Oに、それから、今から嫌なくらいに見ていくプロセス間通信です。asyncioモジュールがターゲットとしているのはどうやらプロセス間通信のようです。それもそのはずで、そもそもasyncioモジュールの成し遂げるシングルスレッドでの非同期な処理形態というのは、レイテンシのある通信を一度にたくさんしたいときに使えるワザだからです。というわけで、以下ではI/Oの中でも、プロセス間通信について見ていきます(ちなみに、標準I/OやファイルI/Oもちょっと細工をしてあげれば、イベントループはブロッキングせずに上手く扱ってくれるようになります。いや、んー、まあ、この場合はイベントループがすごいというよりは、マルチスレッドがすごいということになるんですけど、詳しくは後で話します)。

まずは明かりを灯そう:ソケット

低水準な通信方法にはいくつかありますが、ここではソケットだけを取り上げることにします(他のについてはドキュメントを見てください)。

とはいえ、そもそも、ソケットのノンブロッキングな扱い方といえば、既にselectorを用いた方法があります!イベントループがこれを利用しない手はありませんし、実際そうしています。

イベントループは自前のselectorを持っており、かなり頻繁にそれをチェック(もちろんノンブロッキングで) しています*2。低水準なソケットの操作は、そのselectorにソケットを登録し、読み書き可能になり次第、登録時に渡しておいたコールバック関数で処理するようにします。もちろん、コールバックはそのソケットでの読み書きが可能になり次第、イベントループに仕事として追加されます!

ソケットをループ(のselector)に登録するには次のメソッドを用います:

  • loop.add_reader(fd, callback, *args)
  • loop.remove_reader(fd)
  • loop.add_writer(fd, callback, *args)
  • loop.remove_writer(fd)

fd はファイルディスクリプタですが、ソケットをそのまま渡しても大丈夫です。

肝心の読み書きは次のメソッドで行います。きっとコールバック内で使うことでしょう:

  • loop.sock_recv(sock, nbytes)
  • loop.sock_sendall(sock, data)

ソケットからデータが欲しいようなコルーチンは、Futureオブジェクトを作り、そのFutureオブジェクトをコールバックの引数として渡しておき、コールバックの中で読み出したデータをFutureオブジェクトにセットしてもらえばよさそうです。もちろん、コルーチンはそのFutureオブジェクトを使って await しておきます。

ソケットの接続に関しても、ループのselectorに監視してもらうことができます:

  • loop.sock_connect(sock, address)add_writerのコールバック内で
  • loop.sock_accept(sock)add_readerのコールバック内で

低水準ソケットを使いたいけれど、ソケット自体についてあまり知らないという人は、HOWTOをひとまず読むことをおすすめします。もしその気がないのであれば、低水準においてノンブロッキングI/Oが可能だということを知ってほしかっただけなので、以下は飛ばしてもらっても構いません。

簡単には、クライアントソケット(IPv4, TCP)の場合、

  • ソケットを作る(sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  • ノンブロッキングにする(sock.setblocking(False)
  • ループのselectorに登録(loop.add_writer(sock, callback, *args)
  • コールバック関数中で、ソケットをつなぐ(loop.sock_connect(sock, address)

をするだけで、サーバソケット(IPv4, TCP)の場合は、

  • ソケットを作る(sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  • ノンブロッキングにする(sock.setblocking(False)
  • bind(sock.bind(address, port)
  • listen(sock.listen(backlog)
  • ループのselectorに登録(loop.add_reader(sock, callback, *args)
  • コールバック関数中で、ソケットを受け入れ(loop.sock_accept(sock)

とすればいいでしょう。

補足:その他のソケット種

上ではIPv4アドレスファミリーを用いたTCPタイプのソケットを例に挙げましたが、IPv6Bluetoothアドレスファミリーを用いたり、UDPタイプや生タイプのソケットをつくることもできます。

socketモジュールにあるAF_*定数はアドレスファミリーを表し、SOCK_*定数はソケットタイプを表します。より詳しい内容はドキュメントをどうぞ。

もっと高水準を!:トランスポート/プロトコル

すべてのプロセス間通信はソケットに通ずる……とまでは言いませんが、大抵のプロセス間通信は確かにソケットでできそうです。おさらいすると、「コールバックを持たせてループに預ける」だけ。非常に簡単な方法で非同期なプロセス間通信が実現できたわけですが、接続、読み書き、切断という一連の処理をそれぞれのコールバック関数に書くことになるので、あるソケットの一生を追うのにお目々が行ったり来たり、少し面倒くさそうです。通信の一連の流れ、要はどんなコールバックがそれぞれの段階で呼ばれるのか、というのは一箇所に書いてあったほうが他の誰かにとって、そして未来のあなたにとっても親切でしょう。

というわけで、asyncioモジュールのトランスポート/プロトコルの出番です。プロトコルは、ある通信オブジェクト(≒ソケット)の一連の流れ(接続、読み書き、切断)をメソッドとして記述するためのクラスで、まさに今の私たちが求めているものです。

簡単に説明すると、プロトコルは所定のデータ様式を扱うための処理(所定のデータ様式のアレコレ)を担い、トランスポートは実際のI/Oやバッファリング(I/O現場のアレコレ)を担います。プロトコルファクトリーは各接続ごとにプロトコルインスタンスを新たに作って提供してくれます。

トランスポート

asyncioモジュールは現在 TCP, UDP, SSL, サブプロセスパイプのトランスポートを実装しています。通信において実際に相手とやりとりするのがトランスポートの役目です。各種トランスポートはそれぞれの通信の形態に対応しており、相応のメソッド群を持ち合わせています。

以下にトランスポートを列挙しますが、後で見るように、asyncioモジュールには、基本的なプロトコルがそれぞれのトランスポートを備えたようなものとして用意されているので、基本的にはトランスポートを直接触るようなことはないかと思います。

  • BaseTransport:基底トランスポート
  • ReadTransport:読み込み専用
  • WriteTransport:書き込み専用
  • DatagramTransportUDPに対応
  • BaseSubprocessTransport:サブプロセス通信に対応

プロトコル

asyncioモジュールには、そんな大がかりなプロトコルはなく、それぞれ異なるトランスポートをもつプロトコルとしてサブクラスが発展しているだけです(とはいえ、これらのおかげでトランスポートを直接触らずに済みますから、十分ありがたいものです)。

具体的にどういった様式をデータ単位として扱っていくかということについては、独自にプロトコルを組んでいくことになります。みんなにとって必要なものは恐らく誰かが作っていることでしょう!サードパーティに期待しましょう(たとえば、HTTPについては aiohttpモジュールがあります)。

一番よく使うであろう、Protocolクラスについて見ていきます。さあ、ソケットの一生を書き込みましょう:

  • BaseProtocol.connection_made(transport):コネクション作成時に呼び出し
  • BaseProtocol.connection_lost(exc):コネクションが失われた、閉じられたときに呼び出し
  • Protocol.data_received(data):データを受信したときに呼ばれる。
  • Protocol.eof_received():相手が送信するデータがないことを伝えてきたときに呼ばれる。

その他のプロトコルについてはドキュメントを見てください(プロトコル)。

コネクションの作成

コネクションは用いるソケットタイプやアドレスファミリーによって異なりますが、基本的には:

  • loop.create_connection()SOCK_STREAMTCP)を用いる通信
  • loop.create_datagram_endpoint()SOCK_DGRAMUDP)を用いる通信

を使います。引数には色々と渡せますが、基本的には プロトコルファミリー(あるいは単に、無引数呼び出しでプロトコルインスタンスを返すようなコーラブルオブジェクト)、ホストアドレスポート番号 を順に渡します。

このメソッドはコルーチン関数です。得られるコルーチンは接続を試し、接続が成功すると、プロトコルconnection_madeメソッドが呼ばれ、最終的に(トランスポート, プロトコル)のタプルが返ります。

一方、待ちうけコネクション(サーバー)には次のメソッドを使います。引数は上のメソッドと同じです:

  • loop.create_server

このメソッドもご期待通り、コルーチン関数です。得られるコルーチンは接続が成功するとサーバーオブジェクトを返します。サーバーオブジェクトには次のことができます:

  • server.close():非同期に、待機中のソケットを閉じ、サーバーを停止する
  • server.wait_closed()close()が完了するまで待機する(もちろん非同期に!)
  • sockets:サーバーが待機するソケットのリスト

ドキュメントが気になるなら、ここに置いておくのでご自由に:

流れとしては、プロトコルを渡してコネクションをつくり、あとはプロトコルに書いた通りにソケットが人生(ソケット生)を送るだけです。簡単なやりとり(たとえば、echo)なら、プロトコルの基本メソッドだけで完結すると思います。

データの送信

あ、データの送信について触れていませんでした!データの送信にはトランスポートのメソッドを用います:

  • TCPwrite(data)writelines(list_of_data)write_eof()
  • UDPsendto(data)

「トランスポートには触らないでいいって言ってたのに!」って? それは申し訳ない。でも「直接は触らない」って言ったように、私たちはプロトコル手袋をつけた状態でしかトランスポートを触ることはないはずです。プロトコルconnection_madeはトランスポートを引数として渡されて呼ばれるので、もしそのソケットでデータを送るようなことがあるなら、そのときにトランスポートに首輪でもはめておいてください。えっと、プロトコルインスタンスの属性にでもしておいてくださいってことです。

プロトコルオブジェクトに送信用のメソッドを定義しておくのはいい手かもしれません。特に、渡されたデータに何か小細工してから相手に送りつけたり、その荷物にドラッグが仕込まれてないかを確認したりしたい場合はそうするべきです(ですが、もっと良い方法があって、それはすぐ後で出会うことになります。もう少しだけお待ちを!)。

プロトコルの典型的な実装などについては、標準モジュールのブロッキングプロトコルを参考にするといいかもしれません(インターネットプロトコル)。

ドキュメントの例を載せます:

import asyncio

class EchoClientProtocol(asyncio.Protocol):
    def __init__(self, message, loop):
        self.message = message
        self.loop = loop

    def connection_made(self, transport):
        transport.write(self.message.encode())
        print('Data sent: {!r}'.format(self.message))

    def data_received(self, data):
        print('Data received: {!r}'.format(data.decode()))

    def connection_lost(self, exc):
        print('The server closed the connection')
        print('Stop the event loop')
        self.loop.stop()

loop = asyncio.get_event_loop()
message = 'Hello World!'
coro = loop.create_connection(lambda: EchoClientProtocol(message, loop),
                              '127.0.0.1', 8888)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()

これが、あるソケットの数奇な人生の記録です。接続時に陽気に ‘Hello World!’ と相手に送りつけ、受け取ったデータを単に出力します。なんて退屈な人生なんだ……。接続が切れると、彼はループを止め、やがてプログラムも終了します。

ループを2回に分けて回していることに注意してください。こういうのは今までに見ませんでしたね。loop.run_until_completeは、渡されたFutureオブジェクトに例外が設定されたときはその例外をあげます。2回に分けて実行することで、万が一接続がうまく行かなかったときにループがそのまま永遠に走り続けるというようなことを防止できるわけですね!

続いて、サーバー側も見てみましょう。さっきと同じで、ドキュメントにある例を載せます:

import asyncio

class EchoServerClientProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    def data_received(self, data):
        message = data.decode()
        print('Data received: {!r}'.format(message))

        print('Send: {!r}'.format(message))
        self.transport.write(data)

        print('Close the client socket')
        self.transport.close()

loop = asyncio.get_event_loop()
# Each client connection will create a new protocol instance
coro = loop.create_server(EchoServerClientProtocol, '127.0.0.1', 8888)
server = loop.run_until_complete(coro)

# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

本当に単純なエコーサーバーです。接続すると、接続した相手の名前をプリントします。相手からデータがきたら、それをプリントし、トランスポートを使ってそのまま相手に送り返します(エコーだからね!)。そのあと、トランスポートを閉じ、これ以上このコネクションでやり取りをしない旨を伝えます。ただし、サーバー自体が終わるわけではないことに注意してください!サーバー自体は KeyboardInterrupt があがるまでは動作しつづけます。もし新たにクライアントが接続してきたら、さっきと同じことを繰り返してまた接続を切ります。なんて愛想の悪いコールセンターなんだ!

Ctrl+C によってループが止まれば、サーバーに閉じるよう命じ、ちゃんと閉じるまでループを回してから、ループも閉じます。めでたしめでたし。

※ ん?「Ctrl+C を押しても停止しない」? それは困りました…。もしかして Windowsを使ってませんか? Windowsだと、何も仕事をしていない待機中のイベントループにシグナルが上手く伝わらないようです。イベントループが仕事をしている間に Ctrl+C を押すと上手く反応してくれるので、応急処置としては、絶対に終わらない仕事を与えておく方法があります。あくまで応急処置として、ですけど。

もっともっと高水準を!:ストリーム

トランスポート/プロトコルを用いることで、単純にソケットをイベントループに渡すよりかは断然見通しがよくなりました! ソケットによる他のプロセスとのデータのやり取りはもうバッチリなはずです。でも、確かに本当にI/Oのまわりの装備は整ってきましたが、I/Oとコッチ側のやりとりのことを忘れていました。トランスポート/プロトコルは確かにソケットを繋いで、いい感じに相手からデータを受け取ってくれます。それはいいんですが、そのデータはどうやったら受け取れるんでしょう? 私たちがそのデータを受け取ったのは、そのデータを使って何かしたかったからにちがいありません。

加えて、トランスポートは相手方にバイナリデータを送ってくれますが、トランスポートに渡すために、毎回自分の手で(より抽象的な)データをプロトコルが規定する様式のバイナリデータに変換するのも面倒です。トランスポートが郵便屋さんだとしたら、小包の重さや寸法を調べてくれたり、必要な切手を貼ってくれたりする郵便局の受付係がいたら、とても楽だと思いませんか?

それではご登場願いましょう! ストリームはまさに配達業者と私たちを結ぶ窓口のような役割を果たしてくれます。私たちはStreamReader窓口で「荷物ください」と催促し、StreamWriter窓口に行って「この荷物をお願いします」とお願いすればいいのです。もちろん、各窓口は特定の配達業者と提携していて、私たちの荷物に適切な処置をした上で、その業者に荷物を流してくれます。

有り体に言えば、トランスポート/プロトコルよりも高水準なI/Oシステムです。

トランスポート/プロトコルとストリームの依存関係

ストリームはトランスポート/プロトコルの上になりたつものなので、そこには依存関係があります。簡単にいえば、個々のオブジェクトを作る際の引数に何を渡すのかという関係です。モジュールの構造の関係か、論理上の階層(ストリームがトランスポート/プロトコルより上という階層)と少しだけ変わっています:

  • StreamReader:「最下層」のオブジェクトです。
  • StreamReaderProtocol:ストリーム用にカスタムされたプロトコルで、引数にStreamReaderを取ります。
  • StreamWriter:「最上層」のオブジェクトです。トランスポート、プロトコル、リーダーの3つを引数にとります。

分かるとは思いますが、必要な読み書きストリームをつくる際には

  1. StreamReaderオブジェクトを手に入れる
  2. それを使ってStreamReaderProtocolオブジェクトを手に入れる
  3. それら(+トランスポート)を使ってStreamWriterを手に入れる

という流れになります。とはいえ、StreamReaderだけを作っても特にできることは何もないし、まともに使いたいのであればStreamReaderProtocolは必須です。基本的に、ストリームを使うときはStreamWriterまで作るはずなので、なんであれ、トランスポート/プロトコルは必須です。

StreamReader と StreamReaderProtocol

まずは「受信窓口」から始めましょう。StreamReaderはその内部に倉庫(バッファ(bytearray))をもっていて、StreamReaderProtocolは自分が受け取ったデータをどんどんそこに放り込んでいきます。私たちはStreamReaderのインターフェースを通して倉庫に保管してあるデータを受け取れます。

  • read():引数にバイト数を渡せる。デフォルトではEOFまで読み込む
  • readexactly():引数にバイト数を渡して、厳密にそれだけ読み込む
  • readline():改行がくるまで読み取る。EOFがきた場合は残りをすべて読み込む
  • readuntil()separator引数に渡したバイト文字列までを読み込む

何より大事なのは、これらがコルーチンだということです!これらの読み取りメソッドはバッファに十分なデータがたまるまで、受け渡しを await してくれます。

私たちとStreamReaderを繋ぐのは上で見たread系のメソッドです。配達業者側、つまりトランスポート/プロトコルとはfeed系のメソッドで荷物を受け付けます:

  • feed_data()
  • feed_eof()

これらのメソッドを私たちが直接触ることはないでしょう。StreamReaderProtocolStreamWriterが内部でこれらのメソッドを良しなに使っているので、これらのサブクラスとして実装すればいいだけです。

StreamReaderProtocolインスタンスをつくる際に、第二引数としてclient_connected_cbにコールバック(関数でもコルーチンでも)を渡すことができます。このコールバックは、接続したときに呼ばれるもので、connection_madeメソッドと似た役割をもちます。

StreamWriter

StreamWriter はほとんどトランスポートのラッパです;write(data)writelines(list_of_data)write_eof()メソッドをインターフェースとしてもっており、同名のトランスポートのメソッドを呼び出します。

コネクション

デフォルト(TCP)のStreamReader, StreamWriter, StreamReaderProtocol で十分な場合は、open_connectionが使えます。これはcreate_connectionのラッパーです。実際のソースコード(を若干改変したもの)を見てみます:

async def open_connection(host=None, port=None, *,
                    loop=None, limit=_DEFAULT_LIMIT, **kwds):
    if loop is None:
        loop = events.get_event_loop()
    reader = StreamReader(limit=limit, loop=loop)
    protocol = StreamReaderProtocol(reader, loop=loop)
    transport, _ = await loop.create_connection(
        lambda: protocol, host, port, **kwds)
    writer = StreamWriter(transport, protocol, reader, loop)
    return reader, writer

案の定、まずリーダーを手に入れ、それを使ってプロトコルを手に入れます。それから、create_connection して、トランスポートを手に入れます。最後に、その3つをすべて使って、ライターを手に入れ、リーダーとライターを返します。簡単ですね!

同様に、create_serverのラッパ的存在としてstart_serverもあります:

async def start_server(client_connected_cb, host=None, port=None, *,
                 loop=None, limit=_DEFAULT_LIMIT, **kwds):
    if loop is None:
        loop = events.get_event_loop()

    def factory():
        reader = StreamReader(limit=limit, loop=loop)
        protocol = StreamReaderProtocol(reader, client_connected_cb,
                                        loop=loop)
        return protocol

    return (await loop.create_server(factory, host, port, **kwds))

まあ、やってることは大したことないですよね。

これでだいたい終わり

asyncioモジュールの根幹であるイベントループからはじまり、Futureオブジェクト、コルーチン、トランスポート/プロトコル、ストリームとみてきました。asyncioモジュールの大体の要素は見てきたので、モジュールのドキュメントの大半は見たことあるものになってるかと思います。

サブプロセスだけはやってないですが、ここまで理解していればそんな大したことないでしょう(憶測)。それではレッツエンジョイ asyncio!

あっ、run_in_executor書いてない・・・・・・

*1:実際、Python3.4以前ではコルーチンはジェネレータの形で書いていましたから、当然といえば当然ですね。

*2:具体的にどのくらい頻繁かというと、新しい仕事を始める前に毎回チェックしているようです。

位相の思考メモ

とんちきなメモです。

トポロジー的な「となり」というのを点と点の関係として記述するのは少し勝手が悪い。

たとえば、というかここではもっぱらそういうものしか想定しないけど、ネットワークのトポロジーを考えたときに、当然ながら推移律は成り立たない。でも対称律はなりたつ。反射律についてはとりあえず保留。

で、推移律というのは結構使えるやつで、別に推移律でもなくてもいいんだけど、2つの関係があったときにそこから新しい関係を導けるような規則というのは間違いなく使える。

そこで、点と点の関係として「となり」を捉えるのではなくて、「となり」という概念を領域(集合)(の性質)で捉えることにしてみる。とはいえ、さっきと何か変わるわけでもなく、今のところ、ある集合が該当するとき、それは「その集合内の点は互いにとなり合わせだ」ということにする。これはかなり強い制約で、さっきの点と点の関係と同じ強さだ。グラフ理論のことばでいえば、今のところは「となり」という概念を完全な部分グラフとして理解しようとしている。

とはいえ、さっきの点と点の関係を、領域の性質として単純に読み替えただけなので、やっぱり推移律に相当する規則は存在しない。でも少しだけ進歩があって、「となり」という概念を理解するにあたって、脱焦点化(とでも言っておく)に成功した。構造の強さとしては今だに使い勝手は悪いものの、「となり」という概念を点から引き剥がせている。

でもやっぱりあの規則がほしい!今の文脈で言えば、2つの領域から新たに「となり」の性質をもつ1つの領域を生み出せるような規則がほしい。

「互いにとなり合わせだ」とせずに、それぞれの点において、隣であるような点を集めて、それを1つの領域として捉えたらどうだろう?このような領域は、「この集合内の点すべてと隣であるような点がある」ような領域になる。これもまあ1つの「となり」を捉えられている。その集合の任意の2点が隣だということではないが、少なくともその集合は内部に自身を全体とするような「隣接ネットワーク」を備えている。注意してほしいのは、さっきの素朴な2点の関係よりも幾ばくか構造が弱くなっている。ワンチャンあるかも?

とりあえず、内部に向かっての規則がつくれる。集合内の点すべてと隣であるような点が見つけ出せれば、それを含むような任意の部分集合はやはりそのような領域になる。それから、2つの集合の共通部分にそのような点があれば、その2つの集合の和集合はやはりそのような集合になる。でも、まだ点から逃れていない。それにこういう領域のとり方では本質的にどの点がそういう点なのかということを知ることはできない。

見方を変えよう。脱焦点化するには、逆にその集合のすべての点において言えることを性質として見いだせればよい。そう考えると、こうだ:「その集合内のどの点も、その中の少なくとも1点と隣関係にある」。これは間違いなくそうだろう。この性質を保持するような規則を考えてみると、任意のそのような2つの集合の和集合は、この性質を満たすことがわかる!ただし共通部分についてはそうはいかない。

もっとゆるい性質を考えよう。たとえば:「その集合に属するどの点も、隣となる点を(集合の内外を問わずどこかに)もつ」。これもギリギリ「となり」を捉えられている。「となり」というのを最初は2点の関係として捉えていたが、ここでは思い切って、点の性質とみなしているわけだ。このような性質は大抵のことに耐える。というか、「隣」という属性をもつ点の集合であるから、一般的な一項述語の外延的な集合になりたつことならなんでもなりたつ。

方向転換。そもそも「隣」関係にあることの必要条件ってなんだろう?と考えると、本質的に重要な1つの関係にぶち当たる。それはその2つの点が分離しているという条件だ。「隣にある」というと、第一印象として「2つは近いんだ」ということを感じがちだが、そもそもの前提として、「隣にある」といえるときはその2つは分離しているのだ。2点が「隣でない」というのは、その2つが遠いという印象だけでなく、もしかするとその2つは分離さえしていないと考える必要がある。

ただ、分離という用語を性質として使ってしまうと、その点がどんな点とも分離しているという風に感じとれてしまう。ここでいう分離というのはあくまで二項関係であることに注意してほしい。いっそ、逆に、「密着している」の否定としての関係と捉えるべきかもしれない。ある点が密着しているというと、それはすべての点と密着しているという印象よりも、何か他の点との関係を想起すると思う。

この「密着していない」という性質を、さきほどと同じく領域の性質として捉えることはできないか。と考えると、「まともな領域」というのは密着している2つの点を引き剥がすような線の引き方をしないように思える。と考えると、「密着していない」という概念を、「その集合内のどの点も、外部に密着した点をもたない」という性質として捉えられないだろうか。このような性質を保持するような規則について、和集合も共通部分もこの性質を保持するだろう。全体集合はこの性質を保持する。∅も然り。

この性質は、「その集合内のどの点も、その中の少なくとも1点と隣関係にある」という性質と結構似ている。でも異なる部分もある。それは、隣をもつ⇒非密着性だが、その逆は真でないことにもっぱらよる。全く誰とも隣関係にない中には密着した点もいないようなものがある。つまり、非常に離れたところにある点というものを、今議論している領域は含んでいてもよいのだ。

いまいちど注意しておくが、この領域はけっして「その互いが非密着関係にある」というような領域ではない。あくまでその外と「安全に」切り離されているような領域だということを意味する。

このような性質を満たす領域のあつまりは位相構造を表すはず。結局、位相構造というのは「近さ」とか「隣接」のモデルではなくて、その根本にある「分離性(非密着性)」、「点の空間的な個体性」のモデルなんだなあ。でも確かにネットワークトポロジーにおいても、何が重要かっていうとノードとノードの隣接性よりは、その2つのノードが密着したものではないということな気もするな。ノードのアイデンティティというか。

そして確かに、この領域というのは開集合的。とくに連続した空間を考えたときに、その境界というのはどうしてもすぐ傍に密着した点をもちうる。となると、境界をもたない集合でないと非密着な集合をつくるのは難しいように直観的に感じる。