0

I am doing some performance test to transfer large files (~ 4 GB) from FTPS to SFTP server. I did some research and tried python script to see if there is any performance improvement to get a file from FTPS and transfer to SFTP.

FTPS connection setup

def create_connection(self):
    print('Creating session..........')
    ftp = ftplib.FTP_TLS()
    # ftp.set_debuglevel(2)
    ftp.connect(self.host, self.port)
    ftp.login(self.user, self.passwd)
    ftp.prot_p()
    # optimize socket params for download task
    print('Optimizing socket..........')
    ftp.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
    ftp.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
    ftp.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 60)
    print('Session created successfully')
    return ftp

 def get_file(self, ftp_session, dst_filename, local_filename):
    print('Starting download........', datetime.now())
    myfile = BytesIO()
    print(myfile.tell())
    ftp_session.retrbinary('RETR %s' % dst_filename, myfile.write)
    print(myfile.tell())
    print('Download completed ........', datetime.now())

For SFTP connection I am using paramiko

host, port = "abc.com", 22
    transport = paramiko.Transport((host, port))
    username, password = "user", "pwd"
    transport.connect(None, username, password)
    transport.default_window_size =  3 * 1024 * 1024
    sftp = paramiko.SFTPClient.from_transport(transport)
    myfile.seek(0)
    sftp.putfo(fl=myfile, remotepath='remotepath/' + local_filename)
    sftp.close()

I am using BytesIO so that I can keep the file in memory and stream it while copying. The following code can copy the file but it is taking ~ 20 mins. The code is first copy the file in memory and then its transferring. Is there any possible way to transfer file more efficiently ?

1 Answer 1

0

After some short google searches (yes, google does really work=) I have stumbled across this thread:

Paramiko Fails to download large files >1GB

import threading, os, time, paramiko

#you could make the number of threads relative to file size
NUM_THREADS = 4
MAX_RETRIES = 10

def make_filepart_path(file_path, part_number):
    """creates filepart path from filepath"""
    return "%s.filepart.%s" % (file_path, part_number+1)

def write_chunks(chunks, tnum, local_file_part, username, password, ftp_server, max_retries):
    ssh_conn = sftp_client = None
    for retry in range(max_retries):
        try:
            ssh_conn = paramiko.Transport((ftp_server, port))
            ssh_conn.connect(username=username, password=password)
            sftp_client = paramiko.SFTPClient.from_transport(ssh_conn)
            with sftp_client.open(sftp_file, "rb") as infile:
                with open(local_file_part, "wb") as outfile:
                    for chunk in infile.readv(chunks):
                        outfile.write(chunk)
            break
        except (EOFError, paramiko.ssh_exception.SSHException, OSError) as x:
            retry += 1
            print("%s %s Thread %s - > retrying %s..." % (type(x), x, tnum, retry))
            time.sleep(abs(retry) * 10)
        finally:
            if hasattr(sftp_client, "close") and callable(sftp_client.close):
                sftp_client.close()
            if hasattr(ssh_conn, "close") and callable(ssh_conn.close):
                ssh_conn.close()



start_time = time.time()

for retry in range(MAX_RETRIES):
    try:
        ssh_conn = paramiko.Transport((ftp_server, port))
        ssh_conn.connect(username=username, password=password)
        sftp_client = paramiko.SFTPClient.from_transport(ssh_conn)
        # connect to get the file's size in order to calculate chunks
        filesize = sftp_client.stat(sftp_file).st_size
        sftp_client.close()
        ssh_conn.close()
        chunksize = pow(4, 12)
        chunks = [(offset, chunksize) for offset in range(0, filesize, chunksize)]
        thread_chunk_size = (len(chunks) // NUM_THREADS) + 1
        # break the chunks into sub lists to hand off to threads
        thread_chunks = [chunks[i:i+thread_chunk_size] for i in range(0, len(chunks) - 1, thread_chunk_size)]
        threads = []
        fileparts = []
        for thread_num in range(len(thread_chunks)):
            local_file_part = make_filepart_path(local_file, thread_num) 
            args = (thread_chunks[thread_num], thread_num, local_file_part, username, password, ftp_server, MAX_RETRIES)
            threads.append(threading.Thread(target=write_chunks, args=args))
            fileparts.append(local_file_part)
        for thread in threads:
            thread.start()
        for thread in threads:
            thread.join()
        # join file parts into one file, remove fileparts
        with open(local_file, "wb") as outfile:
            for filepart in fileparts:
                with open(filepart, "rb") as infile:
                    outfile.write(infile.read())
                os.remove(filepart)
        break
    except (EOFError, paramiko.ssh_exception.SSHException, OSError) as x:
        retry += 1
        print("%s %s - > retrying %s..." % (type(x), x, retry))
        time.sleep(abs(retry) * 10)
    finally:
       if hasattr(sftp_client, "close") and callable(sftp_client.close):
           sftp_client.close()
       if hasattr(ssh_conn, "close") and callable(ssh_conn.close):
           ssh_conn.close()


print("Loading File %s Took %d seconds " % (sftp_file, time.time() - start_time))
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.