Подтвердить что ты не робот

Paramiko Не удается загрузить большие файлы> 1GB

def download():
if os.path.exists( dst_dir_path ) == False:
    logger.error( "Cannot access destination folder %s. Please check path and permissions. " % ( dst_dir_path ))
    return 1
elif os.path.isdir( dst_dir_path ) == False:
    logger.error( "%s is not a folder. Please check path. " % ( dst_dir_path ))
    return 1

file_list = None
#transport = paramiko.Transport(( hostname, port)) 
paramiko.util.log_to_file('paramiko.log')
ssh = paramiko.SSHClient() 
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 
#transport
try:
    ssh.connect( hostname, username=username, password=password, timeout=5.0) 
    #transport.connect(username=username, password=password ) 
except Exception, err:
    logger.error( "Failed to connect to the remote server. Reason: %s" % ( str(err) ) )
    return 1

try:
    #sftp = paramiko.SFTPClient.from_transport(transport)
    sftp = ssh.open_sftp() 
except Exception, err:
    logger.error( "Failed to start SFTP session from connection to %s. Check that SFTP service is running and available. Reason: %s" % ( hostname, str(err) ))
    return 1

try:    
    sftp.chdir(src_dir_path)
    #file_list = sftp.listdir(path="%s" % ( src_dir_path ) )
    file_list = sftp.listdir()

except Exception, err:
    logger.error( "Failed to list files in folder %s. Please check path and permissions. Reason: %s" % ( src_dir_path, str(err) ))
    return 1
match_text = re.compile( file_mask )
download_count = 0
for file in file_list:         
    # Here is an item name... but is it a file or directory?         
    #logger.info( "Downloading file %s." % ( file ) )
    if not re.match( file_mask, file ):
        continue
    else:
        logger.info( "File \"%s\" name matched file mask \"%s\". matches %s.Processing file..." % ( file, file_mask, (match_text.match( file_mask ) ) ) )
    src_file_path = "./%s" % ( file )
    dst_file_path = "/".join( [ dst_dir_path, file]   )
    retry_count = 0
    while True:
        try:
            logger.info( "Downloading file %s to %s."  % ( file, dst_file_path ) )
            #sftp.get( file, dst_file_path, callback=printTotals ) #sftp.get( remote file, local file )
            sftp.get( file, dst_file_path) #sftp.get( remote file, local file )
            logger.info( "Successfully downloaded file %s to %s."  % ( file, dst_file_path ) )
            download_count += 1
            break
        except Exception, err:
            if retry_count == retry_threshold:
                logger.error( "Failed to download %s to %s. Reason: %s." % ( file, dst_file_path, str(err) ) )
                sftp.close() 
                #transport.close()
                return 1
            else:
                logger.error( "Failed to download %s to %s. Reason: %s." % ( file, dst_file_path, str(err) ) )
                retry_count +=1

sftp.close() 
transport.close() 
logger.info( "%d files downloaded." % ( download_count ) )
return 0

Когда я запускаю функцию ниже, она загружает исходный файл примерно на 3 минуты, а затем закрывает сеанс, хотя загружается только 38-41 МБ (изменяется) файла 1-1.6 ГБ.

Из файла журнала Paramiko, похоже, что соединение SSh остается открытым, пока сеанс SFTP закрывается:

DEB [20120913-10: 05: 00.894] thr = 1 paramiko.transport: переключиться на новые ключи... DEB [20120913-10: 05: 06.953] thr = 1 paramiko.transport: Rekeying (получено 401 пакетов, получено 1053444 байт) DEB [20120913-10: 05: 07.391] thr = 1 paramiko.transport: kex algos: ['diffie-hellman-group1-sha1', 'diffie-hellman-group-exchange-sha1'] ключ сервера: ['ssh- dss '] client encrypt: [' aes256-ctr ',' aes192-ctr ',' aes128-ctr ',' aes256-cbc ',' aes192-cbc ',' aes128-cbc ',' twofish-cbc ',' blowfish-cbc ',' 3des-cbc ',' arcfour '] server encrypt: [' aes256-ctr ',' aes192-ctr ',' aes128-ctr ',' aes256-cbc ',' aes192-cbc ',' Mac OS: "hmac-sha1", "hmac-sha1-96", "hmac-md5", "cfc-cbc", "blowfish-cbc", "3ds-cbc", "arcfour" ], 'hmac-md5-96', '[email protected]'] сервер mac: ['hmac-sha1', 'hmac-sha1-96', 'hmac-md5', 'hmac-md5-96', '[email protected]'] клиент: ['[email protected]', 'zlib', 'none'] серверный сжимать: ['[email protected]', 'zlib', 'none' ] client lang: [''] server lang: [''] kex следует? False DEB [20120913-10: 05: 07.421] thr = 1 paramiko.transport: Чиперы согласились: local = aes128-ctr, remote = aes128-ctr DEB [20120913-10: 05: 07.421] thr = 1 paramiko.transport: использование kex diffie-hellman-group1-sha1; тип ключа сервера ssh-dss; шифр: локальный aes128-ctr, удаленный aes128-ctr; mac: локальный hmac-sha1, удаленный hmac-sha1; сжатие: локальный нет, удаленный нет DEB [20120913-10: 05: 07.625] thr = 1 paramiko.transport: перейти на новые ключи... INF [20120913-10: 05: 10.374] thr = 2 paramiko.transport.sftp: [chan 1] sftp session закрыт. DEB [20120913-10: 05: 10.388] thr = 2 paramiko.transport: [chan 1] EOF отправлено (1)

После этой точки script завершает работу с этим исключением (из блока try/except sftp.get())

Недостаточно ресурсов для завершения запроса

Сама система имеет гигабайты свободного места на диске, так что это не проблема.

Та же самая передача, с которой работает parakmiko, работает отлично с FileZilla и Java-приложением, которые я написал много лет назад для передачи SFTP. Поэтому я думаю, что это проблема с парамико.

Это выполняется в Windows XP и Windows Server 2003.

Я пробовал патчи Paramko 1.17, чтобы он чаще обновлял ключи, но передача все равно бросает исключение. Python 2.7.3 Paramiko 1.7 с патчем Windows 2003 Sevfer

Идеи?

Дополнительная информация: Он терпит неудачу на Windows XP SP3 и Windows 2003, точно такие же поведение и сообщения об ошибках. Информация о sys.version Windows XP Workstation: "2.7.3 (по умолчанию, 10 апреля 2012, 23:31:26) [MSC v.1500 32 бит (Intel)] ' Windows 2003 Server: "2.7.3 (по умолчанию, 10 апреля 2012, 23:31:26) [MSC v.1500 32 бит (Intel)] ' Я заплатил файл packet.py, чтобы сократить время между продлениями ключей. Это не повлияло на поведение sftp.get().

4b9b3361

Ответ 1

Протокол SFTP не имеет возможности передавать данные файла; вместо этого у него есть способ запросить блок данных с определенного смещения в открытом файле. Наивный способ загрузки файла состоял в том, чтобы запросить первый блок, записать его на диск, затем запросить второй блок и так далее. Это надежный, но очень медленный.

Вместо этого у Paramiko есть трюк производительности, который он использует: когда вы вызываете .get(), он немедленно отправляет запрос для каждого блока в файле и запоминает, какое смещение они должны быть записаны. Затем, когда каждый ответ приходит, он гарантирует, что он будет записан в правильное смещение на диске. Для получения дополнительной информации см. Методы SFTPFile.prefetch() и SFTPFile.readv() в документации Paramiko. Я подозреваю, что хранящаяся в нем информация о хранении, когда загрузка вашего 1GB файла может вызвать... что-то закончится из-за нехватки ресурсов, создавая сообщение "Недостаточно ресурсов".

Вместо того, чтобы использовать .get(), если вы просто вызываете .open(), чтобы получить экземпляр SFTPFile, затем вызовите .read() на этом объекте или просто передайте его в стандартную библиотечную функцию Python shutil.copyfileobj(), чтобы загрузить содержание. Это должно избежать кэша предварительной выборки Paramiko и позволить вам загружать файл, даже если он не так быстро.

i.e:

 def lazy_loading_ftp_file(sftp_host_conn, filename):
    """
        Lazy loading ftp file when exception simple sftp.get call
        :param sftp_host_conn: sftp host
        :param filename: filename to be downloaded
        :return: None, file will be downloaded current directory
    """
    import shutil
    try:
        with sftp_host_conn() as host:
            sftp_file_instance = host.open(filename, 'r')
            with open(filename, 'wb') as out_file:
                shutil.copyfileobj(sftp_file_instance, out_file)
            return {"status": "sucess", "msg": "sucessfully downloaded file: {}".format(filename)}
    except Exception as ex:
        return {"status": "failed", "msg": "Exception in Lazy reading too: {}".format(ex)}

Ответ 2

У меня была очень похожая проблема, в моем случае файл только ~ 400 МБ, но он будет последовательно терпеть неудачу после загрузки около 35 МБ или около того. Он не всегда терпел неудачу при том же количестве загруженных байтов, но где-то около 35 - 40 МБ, файл прекратил бы передачу, а через минуту я получил бы "Недостаточно ресурсов для завершения запроса".

Загрузка файла через WinSCP или PSFTP работала нормально.

Я попробовал метод Screwtape, и он действительно работал, но был очень медленным. Мой 400-мегабайтный файл был в темпе, чтобы взять что-то вроде 4 часов для загрузки, что было неприемлемым таймфреймом для этого конкретного приложения.

Кроме того, когда-то, когда мы впервые установили это, все работало нормально. Но администратор сервера внес некоторые изменения в SFTP-сервер и что, когда что-то сломалось. Я не уверен, какие изменения были внесены, но поскольку он все еще работал нормально с использованием WinSCP/других методов SFTP, я не думал, что это будет полезно, если вы попытаетесь атаковать это со стороны сервера.

Я не собираюсь притворяться, что понимаю, почему, но вот что в итоге работает для меня:

  • Я загрузил и установил текущую версию Paramiko (1.11.1 в это время). Первоначально это не имело никакого значения, но я решил, что упомянул об этом на всякий случай, если это будет частью решения.

  • Трассировка стека для исключения:

    File "C:\Python26\lib\site-packages\paramiko\sftp_client.py", line 676, in get
        size = self.getfo(remotepath, fl, callback)
    File "C:\Python26\lib\site-packages\paramiko\sftp_client.py", line 645, in getfo
        data = fr.read(32768)
    File "C:\Python26\lib\site-packages\paramiko\file.py", line 153, in read
        new_data = self._read(read_size)
    File "C:\Python26\lib\site-packages\paramiko\sftp_file.py", line 157, in _read
        data = self._read_prefetch(size)
    File "C:\Python26\lib\site-packages\paramiko\sftp_file.py", line 138, in _read_prefetch
        self._check_exception()
    File "C:\Python26\lib\site-packages\paramiko\sftp_file.py", line 483, in _check_exception
        raise x
    
  • Сотрясая немного в sftp_file.py, я заметил это (строки 43-45 в текущей версии):

    # Some sftp servers will choke if you send read/write requests larger than
    # this size.
    MAX_REQUEST_SIZE = 32768
    
  • По прихоти, я попытался изменить MAX_REQUEST_SIZE на 1024 и, вот и вот, я смог загрузить весь файл!

  • После того как я заработал, изменив MAX_REQUEST_SIZE на 1024, я попробовал кучу других значений между 1024 и 32768, чтобы увидеть, повлияло ли это на производительность или что-то еще. Но я всегда получал ошибку раньше или позже, когда значение было значительно больше, чем 1024 (1025 было в порядке, но 1048 в итоге не удалось).

Ответ 3

В дополнение к ответу Screwtape также стоит упомянуть, что вам следует, вероятно, ограничить размер блока .read([block size in bytes])

Смотрите ленивый метод чтения большого файла

У меня были реальные проблемы только с file.read() без размера размера блока в 2.4, однако 2.7 определяет правильный размер блока.

Ответ 4

Я пытаюсь проследить код в paramiko, теперь я уверен, что это проблема сервера.

1. Проделанная предварительная выборка

Чтобы увеличить скорость загрузки, paramiko попытается выполнить предварительную выборку с помощью метода fetch. Когда вызывается метод SFTP_FILE.prefetch(), создается новый поток и т.е. запрос на выборку отправляется на сервер, используя весь файл.
мы можем найти это в файле paramiko/sftp_file.py вокруг строки 464.

2. Как убедиться в проблеме сервера

Указанный выше запрос запускается в асинхронном режиме. SFTP_FILE._async_response() используется для получения ответа от сервера async.And отслеживает код, мы можем найти, что это исключение создается в методе SFTP_FILE._async_response(), который конвертируется из сообщения, отправленного с сервера. Теперь мы уверены, что это исключение из сервера.

3. Как решить проблему

Поскольку у меня нет доступа к серверу, поэтому использовать sftp в командной строке - это мой лучший выбор. Но, с другой стороны, теперь мы знаем, что слишком много запросов заставляет сервер сбой, поэтому мы можем заснуть при отправке запрос на сервер.

Ответ 5

Я использую этот тип сценария с paramiko для больших файлов, вы можете поиграться с window_size/packet size чтобы увидеть, что работает лучше для вас, если вы хотите, чтобы он был более производительным, вы можете запускать параллельные процессы для чтения различных фрагментов файлов в параллельно с использованием второго метода (см. http://docs.paramiko.org/en/latest/api/sftp.html#paramiko.sftp_file.SFTPFile.readv)

import time, paramiko

MAX_RETRIES = 10

ftp_server = "ftp.someserver.com"
port = 22
sftp_file = "/somefolder/somefile.txt"
local_file = "/somefolder/somewhere/here.txt"
ssh_conn = sftp_client = None
username = "username"
password = "password"

start_time = time.time()

for retry in range(MAX_RETRIES):
    try:
        ssh_conn = paramiko.Transport((ftp_server, port))
        ssh_conn.connect(username=username, password=password)
        #method 1 using sftpfile.get and settings window_size, max_packet_size
        window_size = pow(4, 12)#about ~16MB chunks
        max_packet_size = pow(4, 12)
        sftp_client = paramiko.SFTPClient.from_transport(ssh_conn, window_size=window_size, max_packet_size=max_packet_size)
        sftp_client.get(sftp_file, local_file)
        #method 2 breaking up file into chunks to read in parallel
        sftp_client = paramiko.SFTPClient.from_transport(ssh_conn)
        filesize = sftp_client.stat(sftp_file).st_size
        chunksize = pow(4, 12)#<-- adjust this and benchmark speed
        chunks = [(offset, chunksize) for offset in range(0, filesize, chunksize)]
        with sftp_client.open(sftp_file, "rb") as infile:
            with open(local_file, "wb") as outfile:
                for chunk in infile.readv(chunks):
                    outfile.write(chunk)
        break
    except (EOFError, paramiko.ssh_exception.SSHException) as x:
        retry += 1
        print("%s %s - > retrying %s..." % (type(x), x, retry))
        time.sleep(abs(retry - 1) * 10)
        #back off in steps of 10, 20.. seconds 
    finally:
        if hasattr(sftp_client, "close") and callable(sftp_client.close):
            sftp_client.close()
        if hasattr(ssh_conn, "close") and callable(ssh_conn.close):
            ssh_conn.close()


print("Loading File %s Took %d seconds " % (sftp_file, time.time() - start_time))

Если вы действительно обеспокоены производительностью, вы можете запустить второй метод и разбить файл на несколько процессов/потоков, здесь пример кода с использованием многопоточности, который записывает несколько частей файла, а затем объединяет их в один файл.

import threading, os, time, paramiko

#you could make the number of threads relative to file size
NUM_THREADS = 4
MAX_RETRIES = 10

def make_filepart_path(file_path, part_number):
    '''creates filepart path from filepath'''
    return "%s.filepart.%s" % (file_path, part_number+1)

def write_chunks(chunks, tnum, local_file_part, username, password, ftp_server, max_retries):
    ssh_conn = sftp_client = None
    for retry in range(max_retries):
        try:
            ssh_conn = paramiko.Transport((ftp_server, port))
            ssh_conn.connect(username=username, password=password)
            sftp_client = paramiko.SFTPClient.from_transport(ssh_conn)
            with sftp_client.open(sftp_file, "rb") as infile:
                with open(local_file_part, "wb") as outfile:
                    for chunk in infile.readv(chunks):
                        outfile.write(chunk)
            break
        except (EOFError, paramiko.ssh_exception.SSHException) as x:
            retry += 1
            print("%s %s Thread %s - > retrying %s..." % (type(x), x, tnum, retry))
            time.sleep(abs(retry - 1) * 10)
        finally:
            if hasattr(sftp_client, "close") and callable(sftp_client.close):
                sftp_client.close()
            if hasattr(ssh_conn, "close") and callable(ssh_conn.close):
                ssh_conn.close()



start_time = time.time()

for retry in range(MAX_RETRIES):
    try:
        ssh_conn = paramiko.Transport((ftp_server, port))
        ssh_conn.connect(username=username, password=password)
        sftp_client = paramiko.SFTPClient.from_transport(ssh_conn)
        #connect to get the file size in order to calculate chunks
        filesize = sftp_client.stat(sftp_file).st_size
        sftp_client.close()
        ssh_conn.close()
        chunksize = pow(4, 12)
        chunks = [(offset, chunksize) for offset in range(0, filesize, chunksize)]
        thread_chunk_size = (len(chunks) // NUM_THREADS) + 1
        #break the chunks into sub lists to hand off to threads
        thread_chunks = [chunks[i:i+thread_chunk_size] for i in range(0, len(chunks) - 1, thread_chunk_size)]
        threads = []
        fileparts = []
        for thread_num in range(len(thread_chunks)):
            local_file_part = make_filepart_path(local_file, thread_num) 
            args = (thread_chunks[thread_num], thread_num, local_file_part, username, password, ftp_server, MAX_RETRIES)
            threads.append(threading.Thread(target=write_chunks, args=args))
            fileparts.append(local_file_part)
        for thread in threads:
            thread.start()
        for thread in threads:
            thread.join()
        #join file parts into one file, remove fileparts
        with open(local_file, "wb") as outfile:
            for filepart in fileparts:
                with open(filepart, "rb") as infile:
                    outfile.write(infile.read())
                os.remove(filepart)
        break
    except (EOFError, paramiko.ssh_exception.SSHException) as x:
        retry += 1
        print("%s %s - > retrying %s..." % (type(x), x, retry))
        time.sleep(abs(retry - 1) * 10)
    finally:
       if hasattr(sftp_client, "close") and callable(sftp_client.close):
           sftp_client.close()
       if hasattr(ssh_conn, "close") and callable(ssh_conn.close):
           ssh_conn.close()


print("Loading File %s Took %d seconds " % (sftp_file, time.time() - start_time))

Ответ 6

Я столкнулся с проблемами при загрузке больших файлов (> 1 ГБ) через SFTP с использованием pysftp. Базовая библиотека - Парамико. Погуглите насчет проблемы, приведите меня сюда и есть отличные решения. Тем не менее, многие посты являются относительно старыми, и я полагаю, что большинство из этих проблем были решены с течением времени. И это не помогло с моей проблемой.

А именно: Paramiko сталкивается с ошибкой памяти при загрузке фрагментов во время предварительной выборки в sftp_file.py. Список расширяется за пределы и ошибка памяти как-то не блокирует выполнение. Вероятно, он молча использовался в стеке. Загрузка завершается неудачно только при возникновении этой ошибки, и они запускаются в отдельных потоках.

В любом случае, способ контролировать размер списка - установить MAX_REQUEST_SIZE:

paramiko.sftp_file.SFTPFile.MAX_REQUEST_SIZE = pow(2, 22) # 4MB per chunk

Однако, если вы превысите 16 МБ, вы столкнетесь с новой проблемой: paramiko.sftp.SFTPError: Получен пакет мусора. Оказывается, есть проверка sftp.py в методе _read_packet:

# most sftp servers won't accept packets larger than about 32k, so
# anything with the high byte set (> 16MB) is just garbage.
if byte_ord(x[0]):
    raise SFTPError("Garbage packet received")

Итак, если порция составляет> 16 МБ, у нас возникает эта ошибка. Я не хотел возиться с самой библиотекой Paramiko, поэтому мне пришлось сохранять размер куска на "приемлемом максимуме" в 4 МБ.

Таким образом, я смог загрузить файлы размером> 30 ГБ. Надеюсь, что это помогает людям.