SFTP, PostgreSQL, Python - the story of a ETL


Some data is delivered to me on a daily basis over SFTP - a "CSV" file, which needs to be imported into a PostgreSQL database - obviously the so-called CSV file doesn't fully respect rfc4180, so there is some cleanup to do.

In my case, the CSV file is like this:

"First Name";"Last Name";"Birthdate";"Nationality"

Some interesting points:

Hence the following program will:

This setup is quite efficient and allows to transfer hundreds of thousands of records per second. The prerequisite is that the DB table is pre-existent (maybe manually created ?)

The code

# -*- coding: utf-8 -*-

import paramiko
import psycopg2
import psycopg2.extras
from tendo import singleton

import os
import inspect
import re
import sys


SSH_HOST = 'sftp-server.example.com'
SSH_USERNAME = 'sftp_user'
SSH_KEYFILE = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile(inspect.currentframe()))[0], "ssh.key")))
SSH_DIR = 'inbox'
SSH_MASK = 'file_pattern.*\.csv'

DSN = "dbname='template1' user='dbuser' host='localhost' password='dbpass'"
TABLE_NAME = 'table_name'

me = singleton.SingleInstance()     # will sys.exit(-1) if other instance is running

conn = psycopg2.connect(DSN)
cur = conn.cursor()
cur.execute("SET datestyle='German'")

ssh = paramiko.SSHClient()

t = paramiko.Transport((SSH_HOST, SSH_PORT))

class RowsIO(object):
    def __init__(self, fobj):
        self.fobj = fobj

    def read(self, size=-1):
    # Replace ;""; with ";null;" to manage NULL values during the import
        line = self.fobj.readline()
        if not line:
            return ""
        while line.find(';"";') != -1:
            line = line.replace(';"";', ";null;")
        return line

    def readline(self, size=-1):
        raise NotImplementedError

key = paramiko.RSAKey.from_private_key_file(SSH_KEYFILE)

t.connect(username=SSH_USERNAME, pkey=key)

sftp = t.open_sftp_client()

for filename in sftp.listdir(SSH_DIR):
    if re.match(SSH_MASK, filename):
        path = '/%s/%s' % (SSH_DIR, filename)
        fobj = sftp.file(path, 'rb')
        cur.execute('TRUNCATE TABLE %s' % TABLE_NAME)
        fobj2 = RowsIO(fobj)
        cur.copy_expert("COPY %s FROM STDOUT WITH DELIMITER ';' NULL 'null' CSV HEADER" % (TABLE_NAME, ), fobj2)