Python multiprocessing pool – iterating over objects methods?

Perhaps someone more fluent in Python‘s Multiprocessing Pool code could help me out. I am trying to connect to several hosts on my network simultaneously (N at any one time) over a socket connection and execute some RPC’s. As one host finishes, I want to add the next host into the Pool to run until all are complete.

I have a class, HClass, with some methods to do so, and a list of hostnames contained in hostlist. But I am failing to grok any of the docs.python.org examples for Pool to get this working.

A short snippet of code to illustrate what I’ve got so far:

hostlist = [h1, h2, h3, h4, ....]
poolsize = 2

class HClass:
  def __init__(self, hostname="default"):
    self.hostname = hostname

  def go(self):
      # do stuff
      # do more stuff
  ....

if __name__ == "__main__":
  objs = [HClass(hostname=current_host) for current_host in hostlist]
  pool = multiprocessing.pool(poolsize)
  results = pool.apply_async(objs.go())

So far I am blessed with this traceback:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 504, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'generator'>: attribute lookup __builtin__.generator failed

Where the process just hangs until I Control-C out of it.

Best answer

I would try to keep interprocess communication down to a minimum. It looks like all you really need to send is the hostname string:

for host in hostlist:
    pool.apply_async(worker, args = (host,), callback = on_return)

For example,

import multiprocessing as mp
import time
import logging

logger = mp.log_to_stderr(logging.INFO)

hostlist = ['h1', 'h2', 'h3', 'h4']*3
poolsize = 2

class HClass:
    def __init__(self, hostname="default"):
        self.hostname = hostname

    def go(self):
        logger.info('processing {h}'.format(h = self.hostname))
        time.sleep(1)
        return self.hostname

def worker(host):
    h = HClass(hostname = host)
    return h.go()

result = []
def on_return(retval):
    result.append(retval)

if __name__ == "__main__":
    pool = mp.Pool(poolsize)
    for host in hostlist:
        pool.apply_async(worker, args = (host,), callback = on_return)
    pool.close()
    pool.join()
    logger.info(result)