Pythonの並列処理が非常に貧弱であることはよく知られています。スレッディングやGILの標準パラメータを無視するのは、技術が低いからではなく、使い方が不適切だからだと思います。Pythonのスレッドとマルチプロセッシングに関する教科書のほとんどは、素晴らしいものではありますが、退屈で長いものです。最初のうちは有用な情報がたくさん書かれていますが、日々の作業を本当に改善する部分までは書かれていないことが多いのです。
DDGで "python threading tutorial "というキーワードでよく検索されるのは、次のようなものです。
実際、producer/Consumerを使用してスレッド/マルチプロセッシングを処理する次のようなコード例があります:
#Example.py
'''''
Standard Producer/Consumer Threading Pattern
'''
import time
import threading
import Queue
class Consumer(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self._queue = queue
def run(self):
while True:
# queue.get() blocks the current thread until
# an item is retrieved.
msg = self._queue.get()
# Checks if the current message is
# the "Poison Pill"
if isinstance(msg, str) and msg == 'quit':
# if so, exists the loop
break
# "Processes" (or in our case, prints) the queue item
print "I'm a thread, and I received %s!!" % msg
# Always be friendly!
print 'Bye byes!'
def Producer():
# Queue is used to share items between
# the threads.
queue = Queue.Queue()
# Create an instance of the worker
worker = Consumer(queue)
# start calls the internal run() method to
# kick off the thread
worker.start()
# variable to keep track of when we started
start_time = time.time()
# While under 5 seconds..
while time.time() - start_time < 5:
# "Produce" a piece of work and stick it in
# the queue for the Consumer to process
queue.put('something at %s' % time.time())
# Sleep a bit just to avoid an absurd number of messages
time.sleep(1)
# This the "poison pill" method of killing a thread.
queue.put('quit')
# wait for the thread to close down
worker.join()
if __name__ == '__main__':
Producer()
うーん......。ちょっとJavaに似ている気がします。
スレッド化された、マルチプロセスのソリューションにProducer / Consumeを使うのが間違っていると言いたいわけではありません。しかし、一般的なコードを書くのに最適な選択だとは思いません。
まず、サンプルの舗装クラスを作成する必要があります。次に、オブジェクトを通過させ、キューの両端を監視することでタスクを完了させるキューを作成します。
次に、Pythonを高速化するためにワーカークラスのプールを作成する必要があります。IBMのチュートリアルで示されているより良い方法を紹介します。これはプログラマー複数のスレッドを使ってウェブページを取得するときによく使う方法でもあります。
#Example2.py
'''''
A more realistic thread pool example
'''
import time
import threading
import Queue
import urllib2
class Consumer(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self._queue = queue
def run(self):
while True:
content = self._queue.get()
if isinstance(content, str) and content == 'quit':
break
response = urllib2.urlopen(content)
print 'Bye byes!'
def Producer():
urls = [
'http://..org', 'http://..com'
'http://..org', 'http://..com'
# etc..
]
queue = Queue.Queue()
worker_threads = build_worker_pool(queue, 4)
start_time = time.time()
# Add the urls to process
for url in urls:
queue.put(url)
# Add the poison pillv
for worker in worker_threads:
queue.put('quit')
for worker in worker_threads:
worker.join()
print 'Done! Time taken: {}'.format(time.time() - start_time)
def build_worker_pool(queue, size):
workers = []
for _ in range(size):
worker = Consumer(queue)
worker.start()
workers.append(worker)
return workers
if __name__ == '__main__':
Producer()
実行はできますが、なんと複雑なコードでしょう!初期化メソッド、スレッド追跡リスト、そして私と同じようにデッドロックに陥りやすい人にとっては悪夢のようなjoin文の数々。これは退屈の始まりに過ぎません!
これまでの成果は?基本的には何もしていません。上記のコードは、ほとんどいつもただパスしているだけです。(非常に基本的なアプローチで、エラーが起こりやすい)非常に費用対効果が悪いです。幸い、もっと良い方法があります。
#p#
はじめに:地図
mapは素晴らしい小さな機能ですが、Pythonの高速な並列コードの鍵でもあります。よく知らない人のために説明すると、mapは関数型言語Lispから派生したもので、map関数は別の関数を順番にマップすることができます。例えば
urls = ['http://..com', 'http://..com']
results = map(urllib2.urlopen, urls)
urlopenメソッドは、呼び出しのすべての結果を順番に返し、リストに格納するためにここで呼び出されます。このように
results = []
for url in urls:
results.append(urllib2.urlopen(url))
Map はこれらの反復を順番に処理します。この関数が呼ばれると、結果が順番に格納された単純なリストに戻ります。
なぜそんなに素晴らしいのかって?適切なライブラリを使えば、マップは並列処理を非常にスムーズに実行できるからです!
1つはマルチプロセシングで、もう1つはあまり知られていませんが強力なサブファイルです:
オフトピック:これは何ですか?ダミー・マルチプロセス・ライブラリをご存知ないのですか?私は最近知りました。マルチプロセシングのドキュメントの中に一文だけ書いてあるだけです。しかもその一文は、そんなものが存在することを知らせるだけのもの。このような投げやりに近いアプローチの結果は、想像を絶するものでしょうね!
ダミーはマルチプロセス・モジュールのクローンです。唯一の違いは、マルチプロセスモジュールがプロセスを使うのに対し、ダミーはスレッドを使うことです。つまり、データは一方から他方へ渡されます。このため、フレーム呼び出しがIOモードかCPUモードかを判断する必要がないため、両者の間でデータを前進させたり後退させたりするのが簡単で、探索プログラムには特に便利です。
マップ関数を使って並列処理を行うには、まずマップ関数を持つモジュールをインポートする必要があります:
from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool
再初期化:
pool = ThreadPool()
この単純な文は、example2.py で関数が行うすべての作業を置き換えます。つまり、有効なワーカーをいくつか作成し、次に来ることに備えて起動し、使いやすいように様々な場所に保存します。
Pool オブジェクトはいくつかのパラメータを取りますが、最も重要なパラメータは process です。これはプール内のワーカーの数を決定します。これを入力しないと、デフォルトであなたのコンピュータのカーネルの値になります。
CPUモードでマルチプロセッシング・プールを使用する場合、通常はコア数が大きいほど高速です。しかし、スレッド処理やネットワーク・バインディングのような処理を行う場合、状況はより複雑になるため、プールの正確なサイズを使用する必要があります。
pool = ThreadPool(4) # Sets the pool size to 4
複数のスレッドを実行した場合、スレッド間の切り替えは多くの時間を浪費するので、忍耐強く最適な数のタスクをデバッグしたほうがいいでしょう。
プールオブジェクトが作成されたので、簡単な並列プログラムをすぐに作ることができます。そこで、example2.pyのurlオープナーを書き換えてみましょう!
import urllib2
from multiprocessing.dummy import Pool as ThreadPool
urls = [
'http://..org',
'http://..org/about/',
'"http://..com/pub/a/python/2300"/04/17/.html',
'http://..org/doc/',
'http://..org/download/',
'http://..org/getit/',
'http://..org/community/',
'https://..org/moin/',
'http://..org/',
'https://..org/moin/LocalUserGroups',
'http://..org/psf/',
'http://..org/devguide/',
'http://..org/community/awards/'
# etc..
]
# Make the Pool of workers
pool = ThreadPool(4)
# Open the urls in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)
#close the pool and wait for the work to finish
pool.close()
pool.join()
WATCH今回、コードはわずか4行ですべての作業を行いました。そのうち3行はまだ単純な固定書き込みです。mapを呼び出せば、前の例と同じことが40行でできたことになります!この2つの方法の違いをよりグラフィカルに示すために、それぞれの方法を別々に実行したときのタイムも計ってみました。
# results = []
# for url in urls:
# result = urllib2.urlopen(url)
# results.append(result)
# # ------- VERSUS ------- #
# # ------- 4 Pool ------- #
# pool = ThreadPool(4)
# results = pool.map(urllib2.urlopen, urls)
# # ------- 8 Pool ------- #
# pool = ThreadPool(8)
# results = pool.map(urllib2.urlopen, urls)
# # ------- 13 Pool ------- #
# pool = ThreadPool(13)
# results = pool.map(urllib2.urlopen, urls)
#p#
# Single thread: 14.4 Seconds
# 4 Pool: 3.1 Seconds
# 8 Pool: 1.4 Seconds
# 13 Pool: 1.3 Seconds
とても素晴らしい!プールの大きさを微調整することが重要な理由も示しています。ここでは、9より大きいと速く走ります。
何千ものサムネイルを生成
CPUモードでやりましょう!私は仕事で定期的にたくさんのイメージフォルダを扱わなければなりません。そのタスクの一つがサムネイルの作成です。これは並列タスクですでに確立されています。
import os
import PIL
from multiprocessing import Pool
from PIL import Image
SIZE = (75,75)
SAVE_DIRECTORY = 'thumbs'
def get_image_paths(folder):
return (os.path.join(folder, f)
for f in os.listdir(folder)
if 'jpeg' in f)
def create_thumbnail(filename):
im = Image.open(filename)
im.thumbnail(SIZE, Image.ANTIALIAS)
base, fname = os.path.split(filename)
save_path = os.path.join(base, SAVE_DIRECTORY, fname)
im.save(save_path)
if __name__ == '__main__':
folder = os.path.abspath(
'11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
os.mkdir(os.path.join(folder, SAVE_DIRECTORY))
images = get_image_paths(folder)
for image in images:
create_thumbnail(Image)
例としては少し難しいですが、基本的には、プログラムにフォルダを渡し、その中にあるすべてのイメージを取り込み、最終的にそれぞれのディレクトリにサムネイルを作成して保存するというものです。
forループの代わりにmapへの並列呼び出しが使われた場合:
import os
import PIL
from multiprocessing import Pool
from PIL import Image
SIZE = (75,75)
SAVE_DIRECTORY = 'thumbs'
def get_image_paths(folder):
return (os.path.join(folder, f)
for f in os.listdir(folder)
if 'jpeg' in f)
def create_thumbnail(filename):
im = Image.open(filename)
im.thumbnail(SIZE, Image.ANTIALIAS)
base, fname = os.path.split(filename)
save_path = os.path.join(base, SAVE_DIRECTORY, fname)
im.save(save_path)
if __name__ == '__main__':
folder = os.path.abspath(
'11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
os.mkdir(os.path.join(folder, SAVE_DIRECTORY))
images = get_image_paths(folder)
pool = Pool()
pool.map(create_thumbnail,images)
pool.close()
pool.join()
わずか数行のコードを変更するだけで、これは非常に大きなスピードアップです。この方法は、CPUタスクとioタスクをそれぞれのプロセスとスレッドで実行する限り、さらに高速化できますが、デッドロックが発生することもよくあります。全体として、mapの有用性と人間によるスレッド管理の欠如を考慮すると、これは美的に美しく、信頼性が高く、デバッグしやすいアプローチだと思います。
さて、記事は終わり。並行タスク完了まであと1行。




