blog

並列処理を1行で済ませる

Pythonの並列処理が非常に貧弱なのは有名な話です。その理由は、スレッドやGILといった標準的なパラメータを度外視すれば、技術不足というよりも、適切な使い方ができていないことにあると思います。Pyt...

Jul 16, 2025 · 14 min. read
シェア

Pythonの並列処理が非常に貧弱であることはよく知られています。スレッディングやGILの標準パラメータを無視するのは、技術が低いからではなく、使い方が不適切だからだと思います。Pythonのスレッドとマルチプロセッシングに関する教科書のほとんどは、素晴らしいものではありますが、退屈で長いものです。最初のうちは有用な情報がたくさん書かれていますが、日々の作業を本当に改善する部分までは書かれていないことが多いのです。

DDGで "python threading tutorial "というキーワードでよく検索されるのは、次のようなものです。

実際、producer/Consumerを使用してスレッド/マルチプロセッシングを処理する次のようなコード例があります:

#Example.py 
''''' 
Standard Producer/Consumer Threading Pattern 
''' 
  
import time 
import threading 
import Queue 
  
class Consumer(threading.Thread): 
    def __init__(self, queue): 
        threading.Thread.__init__(self) 
        self._queue = queue 
  
    def run(self): 
        while True: 
            # queue.get() blocks the current thread until 
            # an item is retrieved. 
            msg = self._queue.get() 
            # Checks if the current message is 
            # the "Poison Pill" 
            if isinstance(msg, str) and msg == 'quit': 
                # if so, exists the loop 
                break 
            # "Processes" (or in our case, prints) the queue item   
            print "I'm a thread, and I received %s!!" % msg 
        # Always be friendly! 
        print 'Bye byes!' 
  
def Producer(): 
    # Queue is used to share items between 
    # the threads. 
    queue = Queue.Queue() 
  
    # Create an instance of the worker 
    worker = Consumer(queue) 
    # start calls the internal run() method to 
    # kick off the thread 
    worker.start() 
  
    # variable to keep track of when we started 
    start_time = time.time() 
    # While under 5 seconds.. 
    while time.time() - start_time < 5: 
        # "Produce" a piece of work and stick it in 
        # the queue for the Consumer to process 
        queue.put('something at %s' % time.time()) 
        # Sleep a bit just to avoid an absurd number of messages 
        time.sleep(1) 
  
    # This the "poison pill" method of killing a thread. 
    queue.put('quit') 
    # wait for the thread to close down 
    worker.join() 
  
if __name__ == '__main__': 
    Producer() 

うーん......。ちょっとJavaに似ている気がします。

スレッド化された、マルチプロセスのソリューションにProducer / Consumeを使うのが間違っていると言いたいわけではありません。しかし、一般的なコードを書くのに最適な選択だとは思いません。

まず、サンプルの舗装クラスを作成する必要があります。次に、オブジェクトを通過させ、キューの両端を監視することでタスクを完了させるキューを作成します。

次に、Pythonを高速化するためにワーカークラスのプールを作成する必要があります。IBMのチュートリアルで示されているより良い方法を紹介します。これはプログラマー複数のスレッドを使ってウェブページを取得するときによく使う方法でもあります。

#Example2.py 
''''' 
A more realistic thread pool example 
''' 
  
import time 
import threading 
import Queue 
import urllib2 
  
class Consumer(threading.Thread): 
    def __init__(self, queue): 
        threading.Thread.__init__(self) 
        self._queue = queue 
  
    def run(self): 
        while True: 
            content = self._queue.get() 
            if isinstance(content, str) and content == 'quit': 
                break 
            response = urllib2.urlopen(content) 
        print 'Bye byes!' 
  
def Producer(): 
    urls = [ 
        'http://..org', 'http://..com' 
        'http://..org', 'http://..com' 
        # etc.. 
    ] 
    queue = Queue.Queue() 
    worker_threads = build_worker_pool(queue, 4) 
    start_time = time.time() 
  
    # Add the urls to process 
    for url in urls: 
        queue.put(url)  
    # Add the poison pillv 
    for worker in worker_threads: 
        queue.put('quit') 
    for worker in worker_threads: 
        worker.join() 
  
    print 'Done! Time taken: {}'.format(time.time() - start_time) 
  
def build_worker_pool(queue, size): 
    workers = [] 
    for _ in range(size): 
        worker = Consumer(queue) 
        worker.start() 
        workers.append(worker) 
    return workers 
  
if __name__ == '__main__': 
    Producer() 

実行はできますが、なんと複雑なコードでしょう!初期化メソッド、スレッド追跡リスト、そして私と同じようにデッドロックに陥りやすい人にとっては悪夢のようなjoin文の数々。これは退屈の始まりに過ぎません!

これまでの成果は?基本的には何もしていません。上記のコードは、ほとんどいつもただパスしているだけです。(非常に基本的なアプローチで、エラーが起こりやすい)非常に費用対効果が悪いです。幸い、もっと良い方法があります。

#p#

はじめに:地図

mapは素晴らしい小さな機能ですが、Pythonの高速な並列コードの鍵でもあります。よく知らない人のために説明すると、mapは関数型言語Lispから派生したもので、map関数は別の関数を順番にマップすることができます。例えば

urls = ['http://..com', 'http://..com'] 
results = map(urllib2.urlopen, urls) 

urlopenメソッドは、呼び出しのすべての結果を順番に返し、リストに格納するためにここで呼び出されます。このように

results = [] 
for url in urls: 
    results.append(urllib2.urlopen(url)) 

Map はこれらの反復を順番に処理します。この関数が呼ばれると、結果が順番に格納された単純なリストに戻ります。

なぜそんなに素晴らしいのかって?適切なライブラリを使えば、マップは並列処理を非常にスムーズに実行できるからです!

1つはマルチプロセシングで、もう1つはあまり知られていませんが強力なサブファイルです:

オフトピック:これは何ですか?ダミー・マルチプロセス・ライブラリをご存知ないのですか?私は最近知りました。マルチプロセシングのドキュメントの中に一文だけ書いてあるだけです。しかもその一文は、そんなものが存在することを知らせるだけのもの。このような投げやりに近いアプローチの結果は、想像を絶するものでしょうね!

ダミーはマルチプロセス・モジュールのクローンです。唯一の違いは、マルチプロセスモジュールがプロセスを使うのに対し、ダミーはスレッドを使うことです。つまり、データは一方から他方へ渡されます。このため、フレーム呼び出しがIOモードかCPUモードかを判断する必要がないため、両者の間でデータを前進させたり後退させたりするのが簡単で、探索プログラムには特に便利です。

マップ関数を使って並列処理を行うには、まずマップ関数を持つモジュールをインポートする必要があります:

from multiprocessing import Pool 
from multiprocessing.dummy import Pool as ThreadPool 

再初期化:

pool = ThreadPool() 

この単純な文は、example2.py で関数が行うすべての作業を置き換えます。つまり、有効なワーカーをいくつか作成し、次に来ることに備えて起動し、使いやすいように様々な場所に保存します。

Pool オブジェクトはいくつかのパラメータを取りますが、最も重要なパラメータは process です。これはプール内のワーカーの数を決定します。これを入力しないと、デフォルトであなたのコンピュータのカーネルの値になります。

CPUモードでマルチプロセッシング・プールを使用する場合、通常はコア数が大きいほど高速です。しかし、スレッド処理やネットワーク・バインディングのような処理を行う場合、状況はより複雑になるため、プールの正確なサイズを使用する必要があります。

pool = ThreadPool(4) # Sets the pool size to 4 

複数のスレッドを実行した場合、スレッド間の切り替えは多くの時間を浪費するので、忍耐強く最適な数のタスクをデバッグしたほうがいいでしょう。

プールオブジェクトが作成されたので、簡単な並列プログラムをすぐに作ることができます。そこで、example2.pyのurlオープナーを書き換えてみましょう!

import urllib2 
from multiprocessing.dummy import Pool as ThreadPool 
  
urls = [ 
    'http://..org', 
    'http://..org/about/', 
    '"http://..com/pub/a/python/2300"/04/17/.html', 
    'http://..org/doc/', 
    'http://..org/download/', 
    'http://..org/getit/', 
    'http://..org/community/', 
    'https://..org/moin/', 
    'http://..org/', 
    'https://..org/moin/LocalUserGroups', 
    'http://..org/psf/', 
    'http://..org/devguide/', 
    'http://..org/community/awards/' 
    # etc.. 
    ] 
  
# Make the Pool of workers 
pool = ThreadPool(4) 
# Open the urls in their own threads 
# and return the results 
results = pool.map(urllib2.urlopen, urls) 
#close the pool and wait for the work to finish 
pool.close() 
pool.join() 

WATCH今回、コードはわずか4行ですべての作業を行いました。そのうち3行はまだ単純な固定書き込みです。mapを呼び出せば、前の例と同じことが40行でできたことになります!この2つの方法の違いをよりグラフィカルに示すために、それぞれの方法を別々に実行したときのタイムも計ってみました。

# results = [] 
# for url in urls: 
#   result = urllib2.urlopen(url) 
#   results.append(result) 
  
# # ------- VERSUS ------- # 
  
# # ------- 4 Pool ------- # 
# pool = ThreadPool(4) 
# results = pool.map(urllib2.urlopen, urls) 
  
# # ------- 8 Pool ------- # 
  
# pool = ThreadPool(8) 
# results = pool.map(urllib2.urlopen, urls) 
  
# # ------- 13 Pool ------- # 
  
# pool = ThreadPool(13) 
# results = pool.map(urllib2.urlopen, urls) 

#p#

#                       Single thread:  14.4 Seconds 
#                              4 Pool:   3.1 Seconds 
#                              8 Pool:   1.4 Seconds 
#                             13 Pool:   1.3 Seconds 

とても素晴らしい!プールの大きさを微調整することが重要な理由も示しています。ここでは、9より大きいと速く走ります。

何千ものサムネイルを生成

CPUモードでやりましょう!私は仕事で定期的にたくさんのイメージフォルダを扱わなければなりません。そのタスクの一つがサムネイルの作成です。これは並列タスクですでに確立されています。

import os 
import PIL 
  
from multiprocessing import Pool 
from PIL import Image 
  
SIZE = (75,75) 
SAVE_DIRECTORY = 'thumbs' 
  
def get_image_paths(folder): 
    return (os.path.join(folder, f) 
            for f in os.listdir(folder) 
            if 'jpeg' in f) 
  
def create_thumbnail(filename): 
    im = Image.open(filename) 
    im.thumbnail(SIZE, Image.ANTIALIAS) 
    base, fname = os.path.split(filename) 
    save_path = os.path.join(base, SAVE_DIRECTORY, fname) 
    im.save(save_path) 
  
if __name__ == '__main__': 
    folder = os.path.abspath( 
        '11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840') 
    os.mkdir(os.path.join(folder, SAVE_DIRECTORY)) 
  
    images = get_image_paths(folder) 
  
    for image in images: 
             create_thumbnail(Image) 

例としては少し難しいですが、基本的には、プログラムにフォルダを渡し、その中にあるすべてのイメージを取り込み、最終的にそれぞれのディレクトリにサムネイルを作成して保存するというものです。

forループの代わりにmapへの並列呼び出しが使われた場合:

import os 
import PIL 
  
from multiprocessing import Pool 
from PIL import Image 
  
SIZE = (75,75) 
SAVE_DIRECTORY = 'thumbs' 
  
def get_image_paths(folder): 
    return (os.path.join(folder, f) 
            for f in os.listdir(folder) 
            if 'jpeg' in f) 
  
def create_thumbnail(filename): 
    im = Image.open(filename) 
    im.thumbnail(SIZE, Image.ANTIALIAS) 
    base, fname = os.path.split(filename) 
    save_path = os.path.join(base, SAVE_DIRECTORY, fname) 
    im.save(save_path) 
  
if __name__ == '__main__': 
    folder = os.path.abspath( 
        '11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840') 
    os.mkdir(os.path.join(folder, SAVE_DIRECTORY)) 
  
    images = get_image_paths(folder) 
  
    pool = Pool() 
        pool.map(create_thumbnail,images) 
        pool.close() 
        pool.join() 

わずか数行のコードを変更するだけで、これは非常に大きなスピードアップです。この方法は、CPUタスクとioタスクをそれぞれのプロセスとスレッドで実行する限り、さらに高速化できますが、デッドロックが発生することもよくあります。全体として、mapの有用性と人間によるスレッド管理の欠如を考慮すると、これは美的に美しく、信頼性が高く、デバッグしやすいアプローチだと思います。

さて、記事は終わり。並行タスク完了まであと1行。

Read next

データ・セキュリティ - ダブル・クエリ・インジェクション分析

ダブルクエリインジェクションの紹介、ダブルクエリインジェクションの意味は、これは少し難しい説明ですが、一般的な用語では、ネストされたサブクエリです。サブクエリを理解し、クエリのキーワードはselectです、これは誰もが知っています。サブクエリを簡単に説明すると、select文の中にselectがあり、そのselect文の中にサブクエリがあるということです。

Jul 16, 2025 · 4 min read