Directory watcher in 5 Minutes

I'm a mechanical engineer specialized on "NVH" (Noise Vibration Harshness). In some cases, when we try to test a mechanical component, we have to make a lot of measurements for a long period of time. NVH measurements need a lot of disk space and at the same time high speed write operations. This is why I decided to write code that helps me to automatically copy or move files from a given directory (local) to another (Network), depending on the age and file type or name.
To do that I decided to use python as programming language and to not use any library. You can clone it from https://github.com/tmeftah/dir-watcher
Step 1: create the app.py file an load the config file
import configparser

config = configparser.ConfigParser(allow_no_value=True)
config.read('conf.cfg')
Create a conf.cfg file
[SETTING]
timeinterval =  10
src_path = /dir_to_watch/*.txt
dist_path = /dir_to_save_to/test
operation = copy
  • timeinterval = how many minutes old show the file be
  • src_path = the source path where to look for the specific file patern
  • dist_path = the distination folder, where files have to be moved/copied to
  • operation = 'copy' for copy/paste or 'move' to cut/past
  • Step 2: read the configuration variables
    # app.py
    .
    ..
    ...
    
    timeinterval = int(config.get('SETTING', 'timeinterval'))
    src_path = config.get('SETTING', 'src_path')
    dist_path = config.get('SETTING', 'dist_path')
    operation = config.get('SETTING', 'operation')
    Step 3: search for files that match the "src_path" pattern
    # app.py
    .
    ..
    ...
    
    file_list = glob.glob(src_path) # get all files in scr_path folder
    the search pattern on src_path are set according python package "glob.glob". for more infomation please see https://docs.python.org/3/library/glob.html
    Step 4: loop through all files and move/copy them
    # app.py
    .
    ..
    ...
    for file in file_list:
        file_datetime = datetime.fromtimestamp(os.path.getctime(file))
        filename = os.path.basename(file)    
        now = datetime.now()
        diff = now - file_datetime
    
        if diff.seconds > timeinterval :
    
            if operation == "copy":
                file_exist = os.path.isfile(os.path.join(dist_path,filename))                    
                if  file_exist:
                    next
                else:
                    try:
                        shutil.copy(file,os.path.join(dist_path,filename))
                        logging.info(f'file {filename} is copied successfully')                                                       
                    except shutil.Error :
                        logging.error(f'copy file {filename} failed')
                        next
    
            if operation == "move":
                try:
                    shutil.move(file,os.path.join(dist_path,filename))
                    logging.info(f'file {filename} is moved successfully')
                except shutil.Error:
                    logging.error(f'moving file {filename} failed')
                    next
    Step 5: Put all together
    I add same logger and prints to track what this script did
    import os
    import sys
    import glob
    from datetime import datetime
    import time
    import shutil
    import configparser
    import logging
    import ctypes
    
    
    # set title of the console window
    ctypes.windll.kernel32.SetConsoleTitleW("Dir-Watcher by Tarek Meftah")
    
    logging.basicConfig(filename='log.log', filemode='a', format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',level=logging.INFO)
    
    
    config = configparser.ConfigParser(allow_no_value=True)
    config.read('conf.cfg')
    
    timeinterval = int(config.get('SETTING', 'timeinterval'))
    src_path = config.get('SETTING', 'src_path')
    dist_path = config.get('SETTING', 'dist_path')
    operation = config.get('SETTING', 'operation')
    
    
    logging.info(100*'*')
    logging.info('start watching')
    logging.info(f'src_path = {src_path}')
    logging.info(f'dist_path = {dist_path}')
    logging.info(f'operation = {operation}')
    logging.info(100*'*')
    
    print()
    print()
    print(100* '*')
    print()
    print('      created by Tarek Meftah ')
    print('      watch folder for changes and copy or move files to another distination folder.')
    print()
    print(100* '*')
    print()
    print()
    print(f'     +    watching folder {src_path} for change ')
    while True:
        try:
            file_list = glob.glob(src_path) # get all files in scr_path folder
    
            for file in file_list:
                file_datetime = datetime.fromtimestamp(os.path.getctime(file))
                filename = os.path.basename(file)    
                now = datetime.now()
                diff = now - file_datetime
    
                if diff.seconds > timeinterval :
    
                    if operation == "copy":
                        file_exist = os.path.isfile(os.path.join(dist_path,filename))                    
                        if  file_exist:
                            next
                        else:
                            try:
                                shutil.copy(file,os.path.join(dist_path,filename))
                                logging.info(f'file {filename} is copied successfully')                                                       
                            except shutil.Error :
                                logging.error(f'copy file {filename} failed')
                                next
    
                    if operation == "move":
                        try:
                            shutil.move(file,os.path.join(dist_path,filename))
                            logging.info(f'file {filename} is moved successfully')
                        except shutil.Error:
                            logging.error(f'moving file {filename} failed')
                            next
    
            time.sleep(1)
        except KeyboardInterrupt:    
            confirm = input('Enter "yes" to cancel or "no" to keep running [yes/no]:').strip().lower()
            if confirm == 'yes' or confirm == "y":            
                logging.info(47* '*'+' Exit '+47* '*' +'\n')            
                break
    
            elif confirm == 'no' or confirm == "n":
                print("     + Keep runnning!")
                continue
            else:
                print ('    + Sorry, no valid answer...')
                continue
        except Exception as e:
             logging.error(f'error: {e.message} ')
    Hopefully you can use this script on your daily work. I used pyinstaller to generate an executable that i can use it where i want and not having to install python.

    25

    This website collects cookies to deliver better user experience

    Directory watcher in 5 Minutes