Я пытаюсь написать python script сканирование папки и сбор обновленного SQL script, а затем автоматически вытащить данные для SQL script. В коде цикл while проверяет новый файл SQL и посылает функцию pull pull. Мне трудно понять, как сделать динамическую очередь с циклом while, но также иметь многопроцессор для запуска задач в очереди.
В следующем коде есть проблема, что итерация цикла while будет работать на длительной работе, прежде чем она переместится на следующую итерацию и соберет другие задания, чтобы заполнить пустой процессор.
Обновление:
-
Благодаря @pbacterio для обнаружения ошибки, и теперь сообщение об ошибке исчезло. После изменения кода код python может выполнять все сценарии работы во время одной итерации и распространять сценарии на четыре процессора. Тем не менее, он будет зависеть от долгой работы, чтобы перейти к следующей итерации, сканированию и отправке вновь добавленных сценариев работы. Любая идея, как восстановить код?
-
Наконец-то я понял, что решение см. ниже. Оказалось, что я искал
the_queue = Queue()
the_pool = Пул (4, worker_main, (the_queue,)) -
Для тех, кто наткнется на подобную идею, следующая архитектура этой автоматизации script конвертирует общий диск в "сервер для выталкивания SQL" или любой другой "очереди" очереди заданий.
а. Питон script
auto_data_pull.py
, как показано в ответе. Вам нужно добавить свою собственную функцию работы.б. A 'batch script' со следующим:
start C:\Anaconda2\python.exe C:\Users\bin\auto_data_pull.py
с. Добавьте задачу, запускаемую стартовым компьютером, запустите "batch script", Все это. Оно работает.
Код Python:
from glob import glob
import os, time
import sys
import CSV
import re
import subprocess
import pandas as PD
import pypyodbc
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = compute(func, args)
output.put(result)
#
# Function used to compute result
#
def compute(func, args):
result = func(args)
return '%s says that %s%s = %s' % \
(current_process().name, func.__name__, args, result)
def query_sql(sql_file): #test func
#jsl file processing and SQL querying, data table will be saved to csv.
fo_name = os.path.splitext(sql_file)[0] + '.csv'
fo = open(fo_name, 'w')
print sql_file
fo.write("sql_file {0} is done\n".format(sql_file))
return "Query is done for \n".format(sql_file)
def check_files(path):
"""
arguments -- root path to monitor
returns -- dictionary of {file: timestamp, ...}
"""
sql_query_dirs = glob(path + "/*/IDABox/")
files_dict = {}
for sql_query_dir in sql_query_dirs:
for root, dirs, filenames in os.walk(sql_query_dir):
[files_dict.update({(root + filename): os.path.getmtime(root + filename)}) for
filename in filenames if filename.endswith('.jsl')]
return files_dict
##### working in single thread
def single_thread():
path = "Y:/"
before = check_files(path)
sql_queue = []
while True:
time.sleep(3)
after = check_files(path)
added = [f for f in after if not f in before]
deleted = [f for f in before if not f in after]
overlapped = list(set(list(after)) & set(list(before)))
updated = [f for f in overlapped if before[f] < after[f]]
before = after
sql_queue = added + updated
# print sql_queue
for sql_file in sql_queue:
try:
query_sql(sql_file)
except:
pass
##### not working in queue
def multiple_thread():
NUMBER_OF_PROCESSES = 4
path = "Y:/"
sql_queue = []
before = check_files(path) # get the current dictionary of sql_files
task_queue = Queue()
done_queue = Queue()
while True: #while loop to check the changes of the files
time.sleep(5)
after = check_files(path)
added = [f for f in after if not f in before]
deleted = [f for f in before if not f in after]
overlapped = list(set(list(after)) & set(list(before)))
updated = [f for f in overlapped if before[f] < after[f]]
before = after
sql_queue = added + updated
TASKS = [(query_sql, sql_file) for sql_file in sql_queue]
# Create queues
#submit task
for task in TASKS:
task_queue.put(task)
for i in range(NUMBER_OF_PROCESSES):
p = Process(target=worker, args=(task_queue, done_queue)).start()
# try:
# p = Process(target=worker, args=(task_queue))
# p.start()
# except:
# pass
# Get and print results
print 'Unordered results:'
for i in range(len(TASKS)):
print '\t', done_queue.get()
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
# single_thread()
if __name__ == '__main__':
# freeze_support()
multiple_thread()
Справка:
- изменить файл монитора с помощью python script: http://timgolden.me.uk/python/win32_how_do_i/watch_directory_for_changes.html
- Multiprocessing:
https://docs.python.org/2/library/multiprocessing.html