Skip to main content

Database-Logger-PyODBC

Database-Logger-PyODBC

Overview

This repository contains a Python implementation of a logging system that supports:

  1. Database Logging: Inserts log entries into a SQL Server database.
  2. File-Based Logging: Maintains error logs in a local file as a backup mechanism.
  3. Automatic Retry for Failed Logs: Attempts to process and insert failed log entries from the local log file into the database.

Features

  • Database Integration:

    • Logs critical events directly into a SQL Server database using pyodbc.
    • Provides detailed logging fields such as timestamp, task ID, log type, status, and error details.
  • File Backup:

    • Writes logs to a local file (dbErrorLog.txt) in case of database connection issues.
    • Retries inserting failed log entries from the file to the database.
  • Error Handling:

    • Catches and logs errors encountered during database operations.
    • Ensures no logs are lost by falling back to file-based storage.

Prerequisites

  1. Python 3.8+.
  2. Required Python packages:
    • pyodbc
    • logging
  3. SQL Server database with a table named Daily_Task_Logs having the following schema:
    CREATE TABLE Daily_Task_Logs (
        Log_TimeStamp DATETIME,
        Task_ID INT,
        Log_Type NVARCHAR(50),
        Log_Code NVARCHAR(50),
        Error_Current_Status NVARCHAR(50),
        Log_Remarks_User NVARCHAR(MAX),
        Last_Error_Status_TimeStamp DATETIME,
        Log_Remarks_System NVARCHAR(MAX),
        Log_Details NVARCHAR(MAX)
    );


Code:

import os
import pyodbc  # Use pyodbc for SQL Server connection
from datetime import datetime
import logging

class Logger:
    def __init__(self):
        # Use pyodbc connection string format for SQL Server
        self.connection_string = "DRIVER={SQL Server};SERVER=xx.xx.xx.xx;DATABASE=xxxxxxxx;UID=xxxxxxxx;PWD=xxxxxxxx;"
        self.documents_path = os.path.expanduser('~')  # Home directory path
        self.log_folder_path = os.path.join(self.documents_path, "Log_Folder")
        self.log_file_path = os.path.join(self.log_folder_path, "dbErrorLog.txt")
        self.setup_logger()

    def setup_logger(self):
        # Ensure the log folder exists
        if not os.path.exists(self.log_folder_path):
            os.makedirs(self.log_folder_path)

        # Configure logging
        logging.basicConfig(filename=self.log_file_path, level=logging.ERROR,
                            format='%(asctime)s %(levelname)s: %(message)s',
                            datefmt='%Y-%m-%d %H:%M:%S')

    def log_to_database(self, tskID, logType, logCode, currentStatus, logRemarksUser, logMessage, logDetails):
        try:
            getDate = datetime.now()
            lastErrorStatusTimestamp = getDate if currentStatus != "NA" else datetime(1900, 1, 1, 12, 0, 0)

            log_insert_query = """
                INSERT INTO Daily_Task_Logs
                (Log_TimeStamp, Task_ID, Log_Type, Log_Code, Error_Current_Status, Log_Remarks_User,
                 Last_Error_Status_TimeStamp, Log_Remarks_System, Log_Details)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
            """

            # Connect to SQL Server using pyodbc
            connection = pyodbc.connect(self.connection_string)

            # Clean up the logMessage safely (without concatenation)
            logMessage = logMessage.replace("\r", "").replace("\n", "").replace("\t", "").replace("\v", "")

            # Execute the insert query using parameters
            cursor = connection.cursor()
            cursor.execute(log_insert_query, (
                getDate.strftime("%Y-%m-%d %H:%M:%S"),
                tskID,
                logType,
                logCode,
                currentStatus,
                logRemarksUser,
                lastErrorStatusTimestamp.strftime("%Y-%m-%d %H:%M:%S"),
                logMessage,
                logDetails
            ))

            connection.commit()
            connection.close()

            # Optionally call the old log handling
            self.old_log_file_input()

        except Exception as e:
            self.log_to_file(f"Failed to insert log: {str(e)}")
            logging.error(f"Error in log_to_database: {str(e)}")

    def old_log_file_input(self):
        if not os.path.exists(self.log_file_path):
            return

        connection = pyodbc.connect(self.connection_string)
        cursor = connection.cursor()

        with open(self.log_file_path, 'r') as file:
            lines = file.readlines()

        failed_lines = []
        for line in lines:
            try:
                cursor.execute(line.strip())
                connection.commit()
            except Exception as e:
                failed_lines.append(line.strip())

        # Rewrite failed lines back to the log file
        if failed_lines:
            with open(self.log_file_path, 'w') as file:
                file.write("\n".join(failed_lines))

        connection.close()

    def log_to_file(self, log_message):
        try:
            with open(self.log_file_path, 'a') as log_file:
                log_file.write(f"{log_message}\n")
        except Exception as e:
            logging.error(f"Error writing to log file: {str(e)}")



# Example usage:
if __name__ == "__main__":
    logger = Logger()
    logger.log_to_database(
        tskID=123,
        logType="Error",
        logCode="ERR123",
        logDetails="Some error details",
        logMessage="Error message",
        currentStatus="Failed",
        logRemarksUser="User remarks"
    )

 

Comments

Popular posts from this blog

MySQL Database connection with python using sqlalchemy

import pandas as pd import pymysql from sqlalchemy import create_engine import urllib.parse username = 'xxxxxxxxx' password = 'xxxxxxxxx' password_encoded = urllib.parse.quote(password) server = 'XX.XX.XX.XX' port = xxxx database = '' conn_string = f'mysql+pymysql://{username}:{password_encoded}@{server}:{port}/{database}' sql_query = ('select *'             ' from Phone_cloud.call_files_New'             ' limit 100;'             ) engine = create_engine(conn_string) df = pd.read_sql(sql_query, con=engine) df  

MySQL Table Data Export And Send To Email

MySQL Data Downloader and Emailer This project connects to a MySQL database, downloads data from specified tables, stores it in CSV files, and sends them via email. Prerequisites Python 3.x MySQL Connector for Python Pandas SMTP access for sending emails Setup Clone the repository: Navigate to the project directory: Install the required Python packages: Code:  import mysql.connector import pandas as pd import os from datetime import datetime import smtplib from email.mime.multipart import MIMEMultipart from email.mime.base import MIMEBase from email.mime.text import MIMEText from email import encoders # Database connection details db_config = {     'host': 'xx.xx.xx.xx',  # Replace with your MySQL host     'user': 'xxxxxxxx',  # Replace with your MySQL username     'password': 'xxxxxxxx',  # Replace with your MySQL password     'database': 'xxxxxxxx'  # Replace with your database name } # List of table names to query table...

Data-Automation-SFTP-Database

Data-Automation-SFTP-Database Overview This repository contains a Python script for automating data workflows involving: SFTP File Download:  Securely connecting to an SFTP server to download files. Data Preprocessing:  Cleaning and transforming data from CSV files, including handling numeric, currency, and datetime columns. Database Integration:  Uploading cleaned data to an SQL database. Logging and Notifications:  Comprehensive logging and email alerts for status updates and error reporting. Features SFTP Automation : Securely connects to an SFTP server. Downloads files based on specific criteria. Archives downloaded files in a remote archive directory. Data Cleaning : Handles invalid and missing values ( NULL ,  N/A , etc.). Processes numeric and currency columns for database compatibility. Parses and standardizes datetime fields. Database Upload : Uses SQLAlchemy for seamless database integration. Supports MSSQL with ODBC Driver 17. Error Handling & Not...