Skip to main content

Data-Automation-SFTP-Database

Data-Automation-SFTP-Database

Overview

This repository contains a Python script for automating data workflows involving:

  1. SFTP File Download: Securely connecting to an SFTP server to download files.
  2. Data Preprocessing: Cleaning and transforming data from CSV files, including handling numeric, currency, and datetime columns.
  3. Database Integration: Uploading cleaned data to an SQL database.
  4. Logging and Notifications: Comprehensive logging and email alerts for status updates and error reporting.

Features

  • SFTP Automation:

    • Securely connects to an SFTP server.
    • Downloads files based on specific criteria.
    • Archives downloaded files in a remote archive directory.
  • Data Cleaning:

    • Handles invalid and missing values (NULLN/A, etc.).
    • Processes numeric and currency columns for database compatibility.
    • Parses and standardizes datetime fields.
  • Database Upload:

    • Uses SQLAlchemy for seamless database integration.
    • Supports MSSQL with ODBC Driver 17.
  • Error Handling & Notifications:

    • Logs errors and success messages to a custom logging system.
    • Sends email alerts for failures or process completion.

Prerequisites

  1. Python 3.8+
  2. Required Python packages:
    • pandas
    • sqlalchemy
    • pysftp
    • smtplib
  3. Database with proper credentials for MSSQL connection.
  4. SMTP setup for sending emails.


Code:


import os
import pandas as pd
from datetime import datetime, timedelta
import pysftp
import smtplib
import posixpath
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email.mime.application import MIMEApplication
from email.mime.message import MIMEMessage
from email import encoders
from sqlalchemy import create_engine
import urllib
from urllib.parse import quote_plus

from Logger import Logger # Logger class is created to maintain our program log



# Initilize Logger
logger = Logger()

# Initialize Date time
today_date = datetime.now()
previous_date = today_date - timedelta(days=1)

# Get the directory where the script is located
script_dir = os.path.dirname(os.path.abspath(__file__))

# Define the target folder
raw_data = os.path.join(script_dir, 'Raw_Data')

# Get the previous date in the format mmddyyyy
previousDate = previous_date.strftime('%m%d%Y')

# Create a full path for the folder with previous's date
raw_data_folder = os.path.join(raw_data, previousDate)

# Create the folder if it doesn't exist
if not os.path.exists(raw_data_folder):
    os.makedirs(raw_data_folder)

print(f"Folder created at: {raw_data_folder}")


# Function to send log email
def send_email(subject, body, success=True):
    sender_email = "xxxxxxxx@xxxxxxxx.com"
    password = "*******"
    receiver_email = "xxxxxxxx@xxxxxxxx.com"

    msg = MIMEMultipart()
    msg['From'] = sender_email
    msg['To'] = receiver_email
    msg['Subject'] = subject

    # Attached email body
    msg.attach(MIMEText(body, 'plain'))

    # Setup email server
    try:
        with smtplib.SMTP('smtp.office365.com', 587) as server:
            server.starttls()
            server.login(sender_email, password)
            text = msg.as_string()
            server.sendmail(sender_email, receiver_email, text)

    except Exception as e:
        logger.log_to_database(0,"Error", "Email Failure", "Failed", "Email Failed", str(e), "Email Error")



# Define SFTP parameters
sftp_host = "xxxxxxx"
sftp_port = 22
sftp_username = "xxxxxxxx"
sftp_password = "xxxxxxx"
sftp_filepath = "/xxxx/xxxx/xxxx"
sftp_archivepath = "/xxxx/xxxx/xxxx/xxxx"

# Define SFTP connection options
cnopts = pysftp.CnOpts()
cnopts.hostkeys = None # Disable host key checking (Use with caution for production)

# Connect to the SFTP Server
try:
    with pysftp.Connection(host=sftp_host, port=sftp_port, username=sftp_username, password=sftp_password, cnopts=cnopts) as sftp:
        print("Connection Successfully Established...")

        # Download files from SFTP file path to AgentRawData
        try:
            # Change to the remote directory
            sftp.cwd(sftp_filepath)
            # List all files and directories in the remote directory
            items = sftp.listdir()

            # Download a particular file that starts with "My_Report" to the local AgentRawData folder
            for item in items:
                if item.startswith("My_Report"):  # Put your file name here
                    remote_item_path = posixpath.join(sftp_filepath, item)
                    local_file_path = os.path.join(raw_data_folder, item)

                    print(f"Found file: {item}. Downloading to: {local_file_path}")

                    # Download the file
                    sftp.get(remote_item_path, local_file_path)
                    print(f"Downloaded file: {item}")

                    # If needed, move the downloaded file to the SFTP archive folder
                    sftp_archive_file_path = posixpath.join(sftp_archivepath, item)
                    sftp.rename(remote_item_path, sftp_archive_file_path)
                    print(f"Moved SFTP file to archive: {sftp_archive_file_path}")

                    # Log success and exit loop after downloading
                    logger.log_to_database(0, "Info", "SFTP_File_Download", "Success", f"Downloaded file: {item}", "Success", "SFTP Download")
                    break  # Assuming only one file is to be downloaded
       
        except Exception as e:
            logger.log_to_database(0, "Error", "SFTP Error", "Failed", "Failed to download to local directory", str(e), "SFTP Error")
            send_email("Error! File Not found or download", f" not found, nor downloaded to local drive")
            print(f"An error occurred during  operation!: {str(e)}")    

except Exception as e:
    logger.log_to_database(0, "Error", "SFTP Connection Error", "Failed", "Failed to connect to SFTP server", str(e), "SFTP connection Error")
    send_email("Error! Unable to connect to SFTP server for Data", f"Error: {str(e)}")
    print(f"An error occurred while connecting to the SFTP server: {str(e)}")
    raise e



# Read CSV file
csv_files = [file for file in os.listdir(raw_data_folder) if file.endswith('.csv')]

if not csv_files:
    print(f"No CSV files found in folder {raw_data_folder}")
    logger.log_to_database(0, "Warning", "No CSV Files Found", "No Data", "No CSV files found for processing.", "No Data", "File Check")
    send_email("No CSV Files Found", "No CSV files found in the folder for processing.")
    exit()

else:
    csv_file_path = os.path.join(raw_data_folder, csv_files[0])
    try:
        # Read the CSV into a DataFrame
        df = pd.read_csv(csv_file_path)
        print(f"DataFrame shape: {df.shape}")

    except Exception as e:
        logger.log_to_database(0, "Error", "CSV Read Failure", "Failed", "Failed to read CSV file", str(e), "CSV Read Error")



# Check if the Datafrane is empty
if df.empty:
    logger.log_to_database(0,"Warning", "No Data", "No Data", "No Data in the downloaded file", "No Data", "Data checking")
    send_email("No Data in Folder", "The downloaded file is empty. No data insert into Database")
    print("No Data in the downloaded file. Email Sent.")

else:
    rename_col = {'Put your column name here with database column name for mapping Like "Employee Name : employee_name"'}

    # Rename DataFrame columns using mapping
    df.rename(columns=rename_col, inplace=True)

    numeric_column = ['Put your numeric column name here Like "Phone Numner : phone_number']

    decimal_column = ['Put your float column name here Like "Total Cost : total_cost"']

    # Function to clean numeric columns
    def clean_numeric_columns(df, columns):
        for col in columns:
            if col in df.columns:
                df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)

    # Function to clean decimal (currency) columns
    def clean_currency_columns(df, columns):
        for col in columns:
            if col in df.columns:
                # Replace NULL, N/A, and other invalid entries with NaN
                df[col] = df[col].replace(['NULL', 'N/A', '#N/A', '#DIV/0!', '', ' ', '#NULL!', '#NUM!', '#REF!', '#VALUE!', '#NAME?'], pd.NA)
               
                # Remove '$', convert to numeric, and replace NaN with 0.0
                df[col] = df[col].replace('[\$,]', '', regex=True)
                df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0.0)


    # Clean numeric and decimal columns
    clean_numeric_columns(df, numeric_column)
    clean_currency_columns(df, decimal_column)


    # Define the columns that contain datetime data
    datetime_columns = ['Put your date time column name here']

    for col in datetime_columns:
        if col in df.columns:
            # Replaces 'N/A', empty strings (''), and spaces (' ') with pd.NaT. This standardizes the missing value format for easier processing.
            df[col] = df[col].replace(['N/A', '', ' '], pd.NaT)

            # Create a parsed column for each datetime field
            parsed_col = col + "_Parsed"

            # The regular expression .str.replace(r'\s(EDT|EST)$', '', regex=True) removes the timezone abbreviations from the end of the datetime strings.
            df[parsed_col] = df[col].str.replace(r'\s(EDT|EST)$', '', regex=True).str.strip()
            df[parsed_col] = pd.to_datetime(df[parsed_col], errors='coerce', format='%m/%d/%Y %I:%M:%S %p')

            # Fill NaT values with the default placeholder date "1900/01/01 00:00:00"
            df[parsed_col] = df[parsed_col].fillna(pd.Timestamp("1900-01-01 00:00:00"))


    # Define database connection
    try:
        username = 'xxxxxxxx'
        password = '********'
        password_encoded = urllib.parse.quote(password)
        server = 'xx.xx.xx.xx'
        database = 'xxxxxxxx'

        engine = create_engine(f'mssql+pyodbc://{username}:{password_encoded}@{server}/{database}?driver=ODBC+Driver+17+for+SQL+Server')
       
        try:
            df.to_sql('Put your database table name here', con=engine, if_exists='append', index=False, chunksize=1000)

            logger.log_to_database(0,"Info", "Data Insert success", "Success", "data inserted successfully into DB", "Success", "Data Insertion")
            send_email("Data", "Data inserted successfully into Database.")
            print("Data inserted successfully into database.")

        except Exception as e:
            logger.log_to_database(0,"Error", "Data Insert Error", "Failed", "Error in data insertion", str(e), "Data Insertion Error")
            send_email("Error! Data", f"Error! - Data {str(e)}.")
            print("Error! Data Insertion")
       
    except Exception as e:
        send_email("Failed! Data Processing", f"Error! {str(e)}")
        raise e


Comments

Popular posts from this blog

MySQL Database connection with python using sqlalchemy

import pandas as pd import pymysql from sqlalchemy import create_engine import urllib.parse username = 'xxxxxxxxx' password = 'xxxxxxxxx' password_encoded = urllib.parse.quote(password) server = 'XX.XX.XX.XX' port = xxxx database = '' conn_string = f'mysql+pymysql://{username}:{password_encoded}@{server}:{port}/{database}' sql_query = ('select *'             ' from Phone_cloud.call_files_New'             ' limit 100;'             ) engine = create_engine(conn_string) df = pd.read_sql(sql_query, con=engine) df  

MySQL Table Data Export And Send To Email

MySQL Data Downloader and Emailer This project connects to a MySQL database, downloads data from specified tables, stores it in CSV files, and sends them via email. Prerequisites Python 3.x MySQL Connector for Python Pandas SMTP access for sending emails Setup Clone the repository: Navigate to the project directory: Install the required Python packages: Code:  import mysql.connector import pandas as pd import os from datetime import datetime import smtplib from email.mime.multipart import MIMEMultipart from email.mime.base import MIMEBase from email.mime.text import MIMEText from email import encoders # Database connection details db_config = {     'host': 'xx.xx.xx.xx',  # Replace with your MySQL host     'user': 'xxxxxxxx',  # Replace with your MySQL username     'password': 'xxxxxxxx',  # Replace with your MySQL password     'database': 'xxxxxxxx'  # Replace with your database name } # List of table names to query table...