Подтвердить что ты не робот

Как запустить os.walk параллельно в Python?

Я написал простое приложение на Java, которое берет список путей и генерирует файл со всеми путями файлов в этом исходном списке.

Если у меня есть paths.txt, у которого есть:

c:\folder1\
c:\folder2\
...
...
c:\folder1000\

Мое приложение запускает рекурсивную функцию на каждом пути многопоточности и возвращает файл со всеми путями файлов в этих папках.

Теперь я хочу написать это приложение в Python.

Я написал простое приложение, которое использует os.walk() для запуска через заданную папку и распечатывает пути к файлам.

Теперь я хочу запустить его параллельно, и я видел, что у Python есть несколько модулей для этого: многопоточность и многопроцессорность.

Какое лучшее, что нужно сделать? И в этом случае, как это делается?

4b9b3361

Ответ 1

Вот многопроцессорное решение:

from multiprocessing.pool import Pool
from multiprocessing import JoinableQueue as Queue
import os

def explore_path(path):
    directories = []
    nondirectories = []
    for filename in os.listdir(path):
        fullname = os.path.join(path, filename)
        if os.path.isdir(fullname):
            directories.append(fullname)
        else:
            nondirectories.append(filename)
    outputfile = path.replace(os.sep, '_') + '.txt'
    with open(outputfile, 'w') as f:
        for filename in nondirectories:
            print >> f, filename
    return directories

def parallel_worker():
    while True:
        path = unsearched.get()
        dirs = explore_path(path)
        for newdir in dirs:
            unsearched.put(newdir)
        unsearched.task_done()

# acquire the list of paths
with open('paths.txt') as f:
    paths = f.split()

unsearched = Queue()
for path in paths:
    unsearched.put(path)

pool = Pool(5)
for i in range(5):
    pool.apply_async(parallel_worker)

unsearched.join()
print 'Done'

Ответ 2

Это шаблон для потоков в python, который был полезен для меня. Я не уверен, что потоки будут увеличивать вашу производительность из-за того, что потоки работают в CPython.

import threading
import Queue
import os

class PathThread (threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def printfiles(self, p):
        for path, dirs, files in os.walk(p):
            for f in files:
                print path + "/" + f

    def run(self):
        while True:
            path = self.queue.get()
            self.printfiles(path)
            self.queue.task_done()

# threadsafe queue
pathqueue = Queue.Queue()
paths = ["foo", "bar", "baz"]

# spawn threads
for i in range(0, 5):
    t = PathThread(pathqueue)
    t.setDaemon(True)
    t.start()

# add paths to queue
for path in paths:
    pathqueue.put(path)

# wait for queue to get empty
pathqueue.join()

Ответ 3

Даже потоковая передача может быть очень полезной для обхода каталога. Я использую следующий код для перемещения дерева SharePoint, получая довольно значительное ускорение для примерно 50 потоков. Эта конкретная программа возвращает пары (путь, данные) для всех файлов xml в структуре каталогов и может быть просто расширена для вашего использования. (Это вырезано и вставлено из моей программы, требуется дополнительное редактирование).

#unique string for error passing error messages
ERROR = '\xffERROR\xff'

class ScanWorker(threading.Thread):
    """Worker class for scanning directory structures.
    pathQueue: queue for pathnames of directories
    resultQueue: results of processFile, pairs of (path, data) to be updated
    """
    lock = threading.Lock()
    dirCount = 0
    def __init__(self, pathQueue, resultQueue):
        self.pathQueue = pathQueue
        self.resultQueue = resultQueue
        super().__init__()

    def run(self):
        """Worker thread.
        Get a directory, process it, and put new directories on the
        queue."""
        try:
            while True:
                self.processDir(self.pathQueue.get())
                self.pathQueue.task_done()
        except Exception as e:
            #pass on exception to main thread
            description = traceback.format_exception(*sys.exc_info())
            description.insert(0,
                "Error in thread {}:\n".format(
                    threading.current_thread().name))
            self.resultQueue.put((ERROR, description))
            self.pathQueue.task_done()

    def processDir(self, top):
        """Visit a directory
        Call self.processFile on every file, and queue the directories.
        """
        #Wait and retry a few times in case of network errors.
        #SharePoint is not reliable, gives errors for no reason
        for retryCount in range(30):
            try:
                names = listdir(top)
                break
            except OSError as e:
                if e.errno in (2,22):
                    lastError = e
                    print(end="L", flush=True)
                    time.sleep(1)
                else:
                    raise
        else:
            print("List: too many retries")
            raise lastError
        #it is not important to worry about race conditions here
        self.__class__.dirCount += 1
        #process contents
        for name in names:
            if isdir(join(top, name)): self.pathQueue.put(join(top, name))
            else: self.processFile(join(top, name))

    def processFile(self, path):
        """Get XML file.
        """
        #only xml files
        if not path.lower().endswith('.xml'): return
        filemtime = datetime.fromtimestamp(getmtime(path))
        #SharePoint is not reliable, gives errors for no reason; just retry
        for retryCount in range(30):
            try:
                data = open(path,'rb').read()
                break
            except OSError as e:
                if e.errno in (2,22):
                    lastError = e
                    print(end="R", flush=True)
                    time.sleep(1)
                else:
                    raise
        else:
            print("Read: too many retries")
            raise lastError
        self.resultQueue.put((path, data))

class Scanner:
    """Interface to the ScanWorkers
    Sharepoint is pretty fast compared to its delay and handles 50 workers well
    Make sure you only create one instance of Scanner!
    """
    def __init__(self, workers):
        #don't restrict the path queue length; this causes deadlock
        #we use a LIFO queue to get more depth-first like search
        #reducing average queue length and hopefully improving server caching
        self.pathQueue = queue.LifoQueue()
        #this is the output queue to the main thread
        self.resultQueue = queue.Queue(5)
        self.workers = workers
        #start workers
        for i in range(workers):
            t = ScanWorker(self.pathQueue, self.resultQueue)
            t.setDaemon(True)
            t.start()

    def startWorkers(self, path):
        #add counter
        self.added = 0
        #and go
        self.pathQueue.put(path)

    def processResult(self, wait=True):
        """Get an element from the result queue, and add to the zip file."""
        path, data = self.resultQueue.get(block=wait)
        if path==ERROR:
            #process gave alarm; stop scanning
            #pass on description
            raise ScanError(data)
        <do whatever you want to do with the file>
        self.resultQueue.task_done()
        self.added += 1

#main
try:
    #set up
    scanner = Scanner(threads)
    scanner.startWorkers(rootpath)
    pathQueue, resultQueue = scanner.pathQueue, scanner.resultQueue
    #scanner is rolling; wait for it to finish
    with pathQueue.all_tasks_done:
        while pathQueue.unfinished_tasks:
            #tasks are still running
            #process results
            while True:
                try: scanner.processResult(wait=False)
                except queue.Empty: break
            #no new files found; check if scanner is ready
            done = pathQueue.all_tasks_done.wait(timeout=1)
            if not done:
                #Not yet; print something while we wait
                print(
                    "\rProcessed {} files from {} directories [{} {}]  "
                    .format(
                        scanner.added,
                        ScanWorker.dirCount,
                        pathQueue.unfinished_tasks,
                        resultQueue.unfinished_tasks,
                    ), end='\r')
    #just to make sure everybody is ready: join the path queue
    pathQueue.join()
    #process remaining of result queue
    while resultQueue.unfinished_tasks: scanner.processResult(wait=True)
    #go to new line to prevent overwriting progress messages
    print()
except ScanError as e:
    print()
    print(*e.args[0], end='')
    print("Process interrupted.")
except KeyboardInterrupt:
    print("\nProcess interrupted.")
print()