21
Step by step, implement a simple Python background task
During these days, I am working on a small-scale Django project for user data analysis. Of course, we use a machine learning model to make such an analysis. To improve our model with newly uploaded data, the model is periodically trained. Here is a small pseudocode example for training our model continuously.
mydb = mysql.connector.connect(database_config)
conn = mydb.cursor()
while True:
model = json.load(model_path)
data = conn.execute("SELECT * FROM your_table")
model.train(data)
json.save(model, model_path)
mydb.commit()
conn.close()
mydb.close()
This is an infinite loop because we continuously train our model. It is worth noting that you cannot put this training process in any of your main tasks (for example, an event after clicking a button) because this blocks your main workflow (or the main thread, which is the thread a customer uses). If you put an infinite loop in your main workflow by accident, your customer cannot use it, and the application may even crash!
To deal with this problem, we can initialize a separate thread or process for the training task. If you have no idea about the thread or process or do not know their differences, please read this introduction on Wikipedia.
Python supports both multi-thread and multi-process tasks with library threading and multiprocessing. They have been provided in the Python language library, so you do not have to install them but just import them. Since a thread is more lightweight than a process, I will use a thread for my background training task, aka the threading library. Let's use the threading library to reimplement the function, so this enables the above pseudocode to run synchronously with the user's workflow.
from threading import Thread
class CreateTrainModelPeriodicallyThread(Thread):
def __init__(self):
super(CreateTrainModelPeriodicallyThread, self).__init__()
def run(self):
try:
mydb = mysql.connector.connect(database_config)
conn = mydb.cursor()
while True:
model = json.load(model_path)
data = conn.execute("SELECT * FROM your_table")
model.train(data)
json.save(model, model_path)
mydb.commit()
conn.close()
mydb.close()
except Exception as e:
print(e)
Please note we create a thread task by writing a new class that inherits the Python built-in Thread class. The __init__(self)
part initializes the thread task, and I will use this part later to add more features. The body of the training task is in the run
method of this thread class. To start this task, you need to create this thread at first, here is an example.
myTrainTask = CreateTrainModelPeriodicallyThread()
Then you can start your task somewhere by the start
method inherited from the threading library.
myTrainTask.start()
Let's think about another problem. Imagine you have to provide service for different users, so you need your model to make an analysis based on different users' data. Maybe you will create different tables for different users. In the example above, the table is fixed, but you want to extract data from different tables. Therefore, the table name is an input variable. The thread class is not created in a function, then how can you send a variable to your task?
Now you should use the __init__(self)
method, which is used to initialize an object of a class in Python. To pass variables to a thread, you can add parameters for the __init__(self)
method. For example, if you want to pass the table name, you can implement it as the example below.
from threading import Thread
class CreateTrainModelPeriodicallyThread(Thread):
def __init__(self, table_name):
super(CreateTrainModelPeriodicallyThread, self).__init__()
self.table_name = table_name
def run(self):
try:
mydb = mysql.connector.connect(database_config)
conn = mydb.cursor()
while True:
model = json.load(model_path)
data = mysql
.connector
.connect(database_config)
.cursor()
.execute("SELECT * FROM {}".format(table_name))
model.train(data)
json.save(model, model_path)
commit.close()
conn.close()
mydb.close()
except Exception as e:
print(e)
Of course, you can do more with this __init__(self)
method. Now your customer needs to terminate this task at any time. But we know an infinite loop cannot terminate at all. Also, Python does not formally provide a stop
or exit
method for a thread. But we can realize our stop
method.
To realize this, we need to use the Event
object in the threading library. An Event
object is used for communication between threads, if one thread signals an event, another thread will wait on it. An Event
has an internal flag that denotes the status of this thread. If the status A is active (denoted by eventA
), eventA.is_set()
returns True
. To activate this status, you can use eventA.set()
, and to deactivate it, you can use eventA.clear()
. Let's add a threading event to indicate if the running task of the thread is terminated, for example, _stopevent
. You want to continuously train the model if _stopevent.is_set()
returns False
. To get rid of the race condition, you may want to block the thread until the internal flag is set or a timeout. Then you can use eventA.wait(timeout)
Let's set up a thread event. Also, if you want your task to be executed after a time interval, you can use the sleep
method in the Python built-in library time
. This will block a thread or a process after a time interval.
from threading import Thread
import time
class CreateTrainModelPeriodicallyThread(Thread):
def __init__(self, table_name):
super(CreateTrainModelPeriodicallyThread, self).__init__()
self.table_name = table_name
self._stopevent = Event() # initialize the event
self._sleepperiod = 1.0 # we wait 1 second to start the thread
def run(self):
try:
self._stopevent.clear() # Make sure the thread is unset
mydb = mysql.connector.connect(database_config)
conn = mydb.cursor()
while not self._stopevent.is_set():
time.sleep(5 * 60) # train the model every 5 minutes
model = json.load(model_path)
data = mysql
.connector
.connect(database_config)
.cursor()
.execute("SELECT * FROM {}".format(table_name))
model.train(data)
json.save(model, model_path)
self._stopevent.wait(self._sleepperiod)
mydb.commit()
conn.close()
mydb.close()
except Exception as e:
print(e)
Now, if the _stopevent
is unset, the thread keeps running. Finally, let's add a method to kill the thread if we want. We can override the join
method of threading to terminate a thread. Be careful that the join
method does not kill a thread, it blocks this thread until this join
method is terminated by some management mechanism of the programming language. In our example, the daemon thread does this for us. A daemon thread has a low priority and runs in the background to perform some systematic tasks. For example, it performs garbage collection after a program finishes running. In Java, the Java virtual machine (JVM) manages daemon threads, which run the Java executables (.class) and realize the cross-platform property of the Java language. The daemon thread of Python works similarly with Java. Typically, the daemon thread is shut down at the end of finishing a program. Therefore, once the join
method of this thread is executed, it will be blocked until the daemon thread completes its task (i.e., the program finishes its work). Therefore, we can use the join
method to stop the thread. Before joining, we want to stop the training loop, so we should set the _stopevent
and then the _stopevent.is_set()
returns True
.
We have our complete workflow below.
class CreateTrainModelPeriodicallyThread(Thread):
def __init__(self, table_name):
super(CreateTrainModelPeriodicallyThread, self).__init__()
self.table_name = table_name
self._stopevent = Event() # initialize the event
self._sleepperiod = 1.0 # we wait 1 second to start the thread
def run(self):
try:
self._stopevent.clear() # Make sure the thread is unset
mydb = mysql.connector.connect(database_config)
conn = mydb.cursor()
while not self._stopevent.is_set():
time.sleep(5 * 60) # train the model every 5 minutes
model = json.load(model_path)
data = mysql
.connector
.connect(database_config)
.cursor()
.execute("SELECT * FROM {}".format(table_name))
model.train(data)
json.save(model, model_path)
self._stopevent.wait(self._sleepperiod)
mydb.commit()
conn.close()
mydb.close()
except Exception as e:
print(e)
def join(self, timeout=None):
self._stopevent.set() # set the stop event so the training loop is terminated.
Thread.join(self, timeout)
After overriding the join
method, you can terminate the thread.
myTrainTask.join()
But please notice, once you kill this thread, you cannot restart it. The only way to "restart" such a thread is to create a new one.
Building such a thread to run a background job is very easy for a small project. However, if you are building a large project, you will have to manage many different tasks and each is executed by a thread, you may need a thread pool or a job queue to manage them. Then I recommend you to read more about the package "Django-background-tasks" for the Django project. I tried that before, and it is not scheduled as my expectation (my Django version is 3.0+ which is not compatible with this library). Another good choice is "Celery", which is a job queue that manages scheduled tasks. To set up a Celery application, you need a broker to exchange messages between the Celery workers and your Django application, "Redis" is a good choice.
Please implement your background task right after reading this blog!
References
- A YouTube Video "Add Thread in Django | Django thread in Background | With example | How to use thread in Django": https://www.youtube.com/watch?v=U5nuICIuAp0&t=530s
- Terminating a Thread: https://www.oreilly.com/library/view/python-cookbook/0596001673/ch06s03.html
- Python threading library docs: https://docs.python.org/3/library/threading.html
- MULTITHREADING - DAEMON THREADS & JOIN METHOD: https://www.bogotobogo.com/python/Multithread/python_multithreading_Daemon_join_method_threads.php
- Daemon thread in Java: https://www.geeksforgeeks.org/daemon-thread-java/
- Python Daemon Threads: https://www.geeksforgeeks.org/python-daemon-threads/
- Asynchronous Tasks With Django and Celery: https://realpython.com/asynchronous-tasks-with-django-and-celery/
- Image from "Multithreading in Python | Set 1": https://www.geeksforgeeks.org/multithreading-python-set-1/
21