Подтвердить что ты не робот

Python script использует цикл while для обновления сценариев работы и многопроцессорных задач в очереди

Я пытаюсь написать python script сканирование папки и сбор обновленного SQL script, а затем автоматически вытащить данные для SQL script. В коде цикл while проверяет новый файл SQL и посылает функцию pull pull. Мне трудно понять, как сделать динамическую очередь с циклом while, но также иметь многопроцессор для запуска задач в очереди.

В следующем коде есть проблема, что итерация цикла while будет работать на длительной работе, прежде чем она переместится на следующую итерацию и соберет другие задания, чтобы заполнить пустой процессор.

Обновление:

  • Благодаря @pbacterio для обнаружения ошибки, и теперь сообщение об ошибке исчезло. После изменения кода код python может выполнять все сценарии работы во время одной итерации и распространять сценарии на четыре процессора. Тем не менее, он будет зависеть от долгой работы, чтобы перейти к следующей итерации, сканированию и отправке вновь добавленных сценариев работы. Любая идея, как восстановить код?

  • Наконец-то я понял, что решение см. ниже. Оказалось, что я искал

    the_queue = Queue()
    the_pool = Пул (4, worker_main, (the_queue,))

  • Для тех, кто наткнется на подобную идею, следующая архитектура этой автоматизации script конвертирует общий диск в "сервер для выталкивания SQL" или любой другой "очереди" очереди заданий.

    а. Питон script auto_data_pull.py, как показано в ответе. Вам нужно добавить свою собственную функцию работы.

    б. A 'batch script' со следующим:

    start C:\Anaconda2\python.exe C:\Users\bin\auto_data_pull.py

    с. Добавьте задачу, запускаемую стартовым компьютером, запустите "batch script", Все это. Оно работает.

Код Python:

from glob import glob
import os, time
import sys
import CSV
import re
import subprocess
import pandas as PD
import pypyodbc
from multiprocessing import Process, Queue, current_process, freeze_support

#
# Function run by worker processes
#

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = compute(func, args)
        output.put(result)

#
# Function used to compute result
#

def compute(func, args):
    result = func(args)
    return '%s says that %s%s = %s' % \
        (current_process().name, func.__name__, args, result)


def query_sql(sql_file): #test func
    #jsl file processing and SQL querying, data table will be saved to csv.
    fo_name = os.path.splitext(sql_file)[0] + '.csv'
    fo = open(fo_name, 'w')
    print sql_file
    fo.write("sql_file {0} is done\n".format(sql_file))
    return "Query is done for \n".format(sql_file)


def check_files(path):
    """
    arguments -- root path to monitor
    returns   -- dictionary of {file: timestamp, ...}
    """
    sql_query_dirs = glob(path + "/*/IDABox/")

    files_dict = {}
    for sql_query_dir in sql_query_dirs:
        for root, dirs, filenames in os.walk(sql_query_dir):
            [files_dict.update({(root + filename): os.path.getmtime(root + filename)}) for 
                     filename in filenames if filename.endswith('.jsl')]
    return files_dict


##### working in single thread
def single_thread():
    path = "Y:/"

    before = check_files(path)
    sql_queue  = [] 

    while True:
        time.sleep(3)
        after = check_files(path)
        added = [f for f in after if not f in before]
        deleted = [f for f in before if not f in after]
        overlapped = list(set(list(after)) & set(list(before)))
        updated = [f for f in overlapped if before[f] < after[f]]  

        before = after

        sql_queue = added + updated
        # print sql_queue
        for sql_file in sql_queue:
            try:
                query_sql(sql_file)
            except:
                pass


##### not working in queue
def multiple_thread():

    NUMBER_OF_PROCESSES = 4
    path = "Y:/"

    sql_queue  = [] 
    before = check_files(path) # get the current dictionary of sql_files
    task_queue = Queue()
    done_queue = Queue()

    while True:         #while loop to check the changes of the files
        time.sleep(5)
        after = check_files(path)
        added = [f for f in after if not f in before]
        deleted = [f for f in before if not f in after]
        overlapped = list(set(list(after)) & set(list(before)))
        updated = [f for f in overlapped if before[f] < after[f]]  

        before = after  
        sql_queue = added + updated   

        TASKS = [(query_sql, sql_file) for sql_file in sql_queue]
        # Create queues

        #submit task
        for task in TASKS:
            task_queue.put(task)

        for i in range(NUMBER_OF_PROCESSES):
                p = Process(target=worker, args=(task_queue, done_queue)).start()          
            # try:
            #     p = Process(target=worker, args=(task_queue))
            #     p.start()

            # except:
            #     pass 

        # Get and print results
        print 'Unordered results:'
        for i in range(len(TASKS)):
            print '\t', done_queue.get()
        # Tell child processes to stop
        for i in range(NUMBER_OF_PROCESSES):
            task_queue.put('STOP')        

# single_thread()
if __name__ == '__main__':
    # freeze_support()
    multiple_thread()

Справка:

4b9b3361

Ответ 1

Я понял это. Спасибо, что ответ вызвал эту мысль. Теперь script может запускать цикл while, чтобы отслеживать папку для нового обновленного/добавленного SQL script, а затем распределять данные по нескольким потокам. Решение поступает из queue.get() и queue.put(). Я предполагаю, что объект очереди заботится об общении сам по себе.

Это окончательный код -

from glob import glob
import os, time
import sys
import pypyodbc
from multiprocessing import Process, Queue, Event, Pool, current_process, freeze_support

def query_sql(sql_file): #test func
    #jsl file processing and SQL querying, data table will be saved to csv.
    fo_name = os.path.splitext(sql_file)[0] + '.csv'
    fo = open(fo_name, 'w')
    print sql_file
    fo.write("sql_file {0} is done\n".format(sql_file))
    return "Query is done for \n".format(sql_file)


def check_files(path):
    """
    arguments -- root path to monitor
    returns   -- dictionary of {file: timestamp, ...}
    """
    sql_query_dirs = glob(path + "/*/IDABox/")

    files_dict = {}
    try:
        for sql_query_dir in sql_query_dirs:
            for root, dirs, filenames in os.walk(sql_query_dir):
                [files_dict.update({(root + filename): os.path.getmtime(root + filename)}) for 
                         filename in filenames if filename.endswith('.jsl')]
    except:
        pass

    return files_dict


def worker_main(queue):
    print os.getpid(),"working"
    while True:
        item = queue.get(True)
        query_sql(item)

def main():
    the_queue = Queue()
    the_pool = Pool(4, worker_main,(the_queue,))

    path = "Y:/"
    before = check_files(path) # get the current dictionary of sql_files
    while True:         #while loop to check the changes of the files
        time.sleep(5)
        sql_queue  = [] 
        after = check_files(path)
        added = [f for f in after if not f in before]
        deleted = [f for f in before if not f in after]
        overlapped = list(set(list(after)) & set(list(before)))
        updated = [f for f in overlapped if before[f] < after[f]]  

        before = after  
        sql_queue = added + updated   
        if sql_queue:
            for jsl_file in sql_queue:
                try:
                    the_queue.put(jsl_file)
                except:
                    print "{0} failed with error {1}. \n".format(jsl_file, str(sys.exc_info()[0]))
                    pass
        else:
            pass

if __name__ == "__main__":
    main()  

Ответ 2

Где вы определили sql_file в multiple_thread() в

multiprocessing.Process(target=query_sql, args=(sql_file)).start()

Вы не определили sql_file в методе, и, кроме того, вы использовали эту переменную в цикле for. Область переменной ограничивается только циклом for.

Ответ 3

Попробуйте заменить это:

result = func(*args)

:

result = func(args)