25
Python in Appwrite
Appwrite is a Self hosted Back-end as a Service ( BaaS ) that allows developers to make apps easier and faster. It is designed to be run on any operating system or platform , and an Appwrite app can be written in most programming languages / frameworks ( Trust me they have lots of SDKs ).
It is still in its pre-release stage ( first pre-release on sep of 2019 or 2 years ago ) as of time of writing but it has seen substantial work ( 6.8k commits across 119 branches by 207 contributors ) put into it and a large and active community engagement.
I will be making an Anime list bot with Appwrite and Telegram to show you the ropes.
We will be using docker to install Appwrite, docker is an open source containerization platform , that is it allows developers to package their applications into little virtual spaces , isolated from the rest of the system , in what it calls containers.
install docker from here
docker run -it --rm \
--volume /var/run/docker.sock:/var/run/docker.sock \
--volume "$(pwd)"/appwrite:/usr/src/code/appwrite:rw \
--entrypoint="install" \
appwrite/appwrite:0.11.0
docker run -it --rm ^
--volume //var/run/docker.sock:/var/run/docker.sock ^
--volume "%cd%"/appwrite:/usr/src/code/appwrite:rw ^
--entrypoint="install" ^
appwrite/appwrite:0.11.0
docker run -it --rm ,
--volume /var/run/docker.sock:/var/run/docker.sock ,
--volume ${pwd}/appwrite:/usr/src/code/appwrite:rw ,
--entrypoint="install" ,
appwrite/appwrite:0.11.0
System requirements
Appwrite requires at least:
- 1 CPU core and
- 512MB (2GB Recommended) of RAM.
Or If ClamAV is installed :
- 2GB of RAM is required since ClamAV uses at least 1GB on its own.
Let the program do its thing, which will take quite a bit of time if you have a lower than average internet connection .
If you see this message you can proceed to the next step.
Errors you may encounter
docker: Error response from daemon: status code not OK but 500:
This means that docker does not have permissions to access the directory you want to install on, and can be easily fixed by doing the following :
- Go to Settings > Resources > File Sharing
- Then add the directory to the list
After installing Appwrite you go to localhost or 127.0.0.1 in your preferred browser to access the Appwrite Dashboard.
For its first run you have to Create a Root user by :
- Providing it your preferred name , *
- Adding in an Email address ,
- Creating a strong password and
- Finally accepting the terms and conditions.
All pretty standard sign up procedures that should land you here when done correctly.
Just click on Create project or the + sign on the dashboard and name it however you desire.
Then grab the Project id and Api Endpoint from the settings tab for later use in the SDK .
After noting them down , go to the API keys tab
Create a new key with all the Scopes you need.
Install the python Sdk using pip
pip install appwrite
install python telegram bot with
pip install python-telegram-bot
install jikanPy with
pip install jikanpy
install dotenv with
pip install python-dotenv
We first need to Create a telegram bot by using
@botfather
on telegram and obtain a bot token.
Then create a new directory and place some boilerplate code to control the bot
main.py
+ import dotenv
+
+ from os import getenv
+ from telegram.ext import Updater
+ from telegram.ext import CommandHandler
+
+ token = dotenv.get_key('.env','TOKEN') or getenv('TOKEN')
+
+ updater = Updater(token=token, use_context=True)
+ dispatcher = updater.dispatcher
+
+ def start(update, context):
+ context.bot.send_message(
+ chat_id=update.effective_chat.id,
+ text="Hi there , im alive")
+
+ def main():
+ "Main Execution"
+ start_handler = CommandHandler('start', start)
+ dispatcher.add_handler(start_handler)
+ updater.start_polling()
+
+ if __name__ == '__main__':
+ main()
+
then add your token in a .env file as follows
.env
+ TOKEN=<###########:YOUR_TOKEN_HERE-KSDFJAKSDJFHKSJDHF>
Next up lets set up the appwrite boiler plate code as follows
import dotenv
from os import getenv
from telegram.ext import Updater
from telegram.ext import CommandHandler
+ from appwrite.client import Client
token = dotenv.get_key('.env','TOKEN') or getenv('TOKEN')
+ ENDPOINT = dotenv.get_key('.env','ENDPOINT') or getenv('ENDPOINT')
+ PROJECTID = dotenv.get_key('.env','PROJECTID') or getenv('PROJECTID')
+ APIKEY = dotenv.get_key('.env','APIKEY') or getenv('APIKEY')
updater = Updater(token=token, use_context=True)
dispatcher = updater.dispatcher
+ client = Client()
+ (client
+ .set_endpoint(F'http://{ENDPOINT}/v1') # Your API Endpoint
+ .set_project(PROJECTID) # Your project ID
+ .set_key(APIKEY) # Your secret API key
+ )
def start(update, context):
context.bot.send_message(
chat_id=update.effective_chat.id,
text="Hi there , im alive")
def main():
"Main Execution"
start_handler = CommandHandler('start', start)
dispatcher.add_handler(start_handler)
updater.start_polling()
if __name__ == '__main__':
main()
dont forget to add the respective values to the .env file
TOKEN=<###########:YOUR_TOKEN_HERE-KSDFJAKSDJFHKSJDHF>
+ ENDPOINT=localhost
+ PROJECTID=617bf##f####d
+ APIKEY=20e###a379c0f306b9637fff7fae6b0cb73ab98453300613d84fdd74fd6c3c947dda#####c9b05e7####ad3e5bc2adc5c782192d4d586f51e2a668c9de5a44d7fa6f43###bc737216e43bbd##faa949cf######fdf9779eceb1e938e4aaba4073eb8c1c0ce7a2b216a75d4f532545dc7795dd92f42d70c#####79069d783a187
Next up add a collection with the following rules (key:datatype)
in the Appwrite dashboard
Make sure not to include special chars in your keyname
- userID : numeric
- MalList : numeric ( array like )
- state : text ( array like )
then add that to the code as follows :
.env
TOKEN=<###########:YOUR_TOKEN_HERE-KSDFJAKSDJFHKSJDHF>
ENDPOINT=localhost
PROJECTID=617bf##f####d
APIKEY=20e###a379c0f306b9637fff7fae6b0cb73ab98453300613d84fdd74fd6c3c947dda#####c9b05e7####ad3e5bc2adc5c782192d4d586f51e2a668c9de5a44d7fa6f43###bc737216e43bbd##faa949cf######fdf9779eceb1e938e4aaba4073eb8c1c0ce7a2b216a75d4f532545dc7795dd92f42d70c#####79069d783a187
+ COLLECTIONID=617c51c613eab
main.py
TOKEN = dotenv.get_key('.env','TOKEN') or getenv('TOKEN')
ENDPOINT = dotenv.get_key('.env','ENDPOINT') or getenv('ENDPOINT')
PROJECTID = dotenv.get_key('.env','PROJECTID') or getenv('PROJECTID')
APIKEY = dotenv.get_key('.env','APIKEY') or getenv('APIKEY')
+ COLLECTIONID = dotenv.get_key('.env', 'COLLECTIONID') or getenv('COLLECTIONID')
Appwrite offers the database API for storing persistent data.
I will first explain the basics of creating, reading from, updating and deleting with the api. And then explain how i've used it in an actual demo app.
Initial setup :
the following code allows access to the database variable which is an instance of Database class.
from appwrite.client import Client
from appwrite.services.database import Database
client = Client()
(client
.set_endpoint('https://[HOSTNAME_OR_IP]/v1') # Your API Endpoint
.set_project('5df5acd0d48c2') # Your project ID
.set_key('919c2d18fb5d4...a2ae413da83346ad2') # Your secret API key
)
database = Database(client)
Collections are the sql equivalent of tables for appwrite.
The API methods to access them are as follows:
Create collection :
Parameters :
name : string (required)
read : array of strings with read perms (required)
write : array of strings with write perms (required)
rules : array of rule objects (required)
Returns :
collection object
Code :
database.create_collection('[NAME]', [], [], [])
List collection :
Gets a list of a users collections
Parameters :
search : string ( term to search for max 256 chars)
limit : integer ( default 25 , max 100 )
offset : integer ( default 0 , used for pagination )
orderType: string (ASC for ascending order and DESC for descending )
Returns :
Collection list object
Code :
database.list_collections()
Get collection:
Gets a collection via its unique id.
Parameters :
collectionid : string required
Returns :
Collection object
*Code *:
database.get_collection('[COLLECTION_ID]')
Update collection :
Updates a collection via its unique id.
Parameters :
collectionid: string required
name : string required (collection name)
read : array of strings with read perms (required)
write : array of strings with write perms (required)
rules : array of rule objects (required)
Returns :
Collection object
Code :
database.update_collection('[COLLECTION_ID]', '[NAME]')
Delete collection :
Deletes a collection via its unique id.
Parameters :
collectionid: string required
Returns :
HTTP code 204
Code :
database.delete_collection('[COLLECTION_ID]')
Documents are the sql equivalent of records or entries
and the api methods to access them are as follows
Create Document :
Parameters :
collectionId : string (required)
data : json object (required)
read : array of strings with Read perms
write : array of strings with write perms
parentDocument : string
parentProperty : string
parentPropertyType : string
Returns :
Document object
Code :
database.create_document('[COLLECTION_ID]', {})
List Document :
Gets a list of a users documents
Parameters :
collectionId : string (required)
filters : array of filter strings
limit : integer ( default 25 , max 100 )
offset: integer ( default 0 , used for pagination)
orderField : string ( field to sort by )
orderType : string (ASC for ascending and DESC for decending)
orderCast : string (int, string, date, time or datetime)
search : string
Returns :
Documeents list object
Code :
database.list_documents('[COLLECTION_ID]')
Get Document:
Gets a document via its unique id.
Parameters :
collectionid : string required
documentId : string required
Returns :
Document object
*Code *:
database.get_document('[COLLECTION_ID]', '[DOCUMENT_ID]')
Update Document :
Updates a Document via its unique id.
Parameters :
collectionid: string required
documentId : string required
data : Json object required
read : array of strings with read perms
write : array of strings with write perms
Returns :
Document object
Code :
database.update_document('[COLLECTION_ID]', '[DOCUMENT_ID]', {})
Delete Document :
Deletes a Document via its unique id.
Parameters :
collectionid: string required
documentId : string required
Returns :
HTTP code 204
Code :
database.delete_document('[COLLECTION_ID]', '[DOCUMENT_ID]')
After you have completed the project initialization steps and gotten yourself familiar with the methods you can fork the project repo and follow along.
Creating a document :
def set_state(update, context):
"""
Updates the database state and id,
generally handles the insert and update side of things
"""
user_id = update.effective_user.id
cqd = update.callback_query.data
state = str(cqd.split('-')[1])
mal_id = int(cqd.split('-')[-1])
exists = database.list_documents(
COLLECTIONID,
filters=[f'userID={user_id}'])
if exists['sum'] == 0:
print('Data not found , create ')
# been deleted or never existed
payload = {
"userID": user_id,
"MalList": [mal_id],
"state": [state]
}
database.create_document(COLLECTIONID, payload)
context.bot.send_message(
chat_id=update.effective_chat.id,
text=f'Successfully added {mal_id} to list ')
else:
# data exists , update
print('Data exists , update ')
doc_id = exists['documents'][0]['$id']
print(exists)
if mal_id in exists['documents'][0]['MalList']:
context.bot.send_message(
chat_id=update.effective_chat.id,
text=(
f'{mal_id} is already on your list\n'
'use /update {id} to update its state\n'
))
else :
malist = list(exists['documents'][0]['MalList'])
new_state = list(exists['documents'][0]['state'])
new_state.append(str(state))
malist.append(int(mal_id))
payload = {
"userID": user_id,
"MalList": malist,
"state": new_state
}
updated = database.update_document(
COLLECTIONID,
doc_id,
payload)
context.bot.send_message(
chat_id=update.effective_chat.id,
text=f'Added {mal_id} to list. ')
In the set_state function of the app in the repo you will find that we use database.create_document , specifically here
payload = {
"userID": user_id,
"MalList": [mal_id],
"state": [state]
}
database.create_document(COLLECTIONID, payload)
context.bot.send_message(
chat_id=update.effective_chat.id,
text=f'Successfully added {mal_id} to list ')
We pass it a payload of key value pairs that conform to the rules we set during the initialization phase, and have it add our anime shows to the collection we created.
and here
payload = {
"userID": user_id,
"MalList": malist,
"state": new_state
}
updated = database.update_document(
COLLECTIONID,
doc_id,
payload)
we see database.update_document in use to update the document in case it exists , instead of creating a new one. it uses one more parameter called doc_id i.e the document id that is parsed from the existing document
doc_id = exists['documents'][0]['$id']
which was fetched by using database.list_documents and passing in a filter where the userID attribute is equal to the current user of the bot (update.effective_user.id)
exists = database.list_documents(
COLLECTIONID,
filters=[f'userID={user_id}'])
we can see delete actions used in delete_item
def delete_item(update, context):
"""
deletes entry
"""
user_id = update.effective_user.id
if not len(context.args) == 1:
context.bot.send_message(
chat_id=update.effective_chat.id,
text='Usage : /delete {id} ')
else:
# id of anime in question
try:
mal_id = int(context.args[0])
# if doesnt exist ... ignore
docs = database.list_documents(
COLLECTIONID,
filters=[f'userID={user_id}'])
print(docs)
if len(docs['documents']) == 0:
context.bot.send_message(
chat_id=update.effective_chat.id,
text="Nothing in your list to delete , start adding more via /anime.",
parse_mode=ParseMode.HTML)
else:
docs = docs['documents'][0] # there should only be one doc
doc_id = docs['$id']
print(docs)
mal_ids = docs['MalList']
statuses = docs['state']
print(mal_ids)
print(statuses)
last_items = len(mal_ids) == 1 and len(statuses) == 1
correct_item = mal_id == mal_ids[0]
if last_items and correct_item:
# delete the whole document as its the last item
database.delete_document(COLLECTIONID, doc_id)
context.bot.send_message(
chat_id=update.effective_chat.id,
text=f"Removed {mal_id} From your list , List is now empty .",
parse_mode=ParseMode.HTML)
elif not correct_item:
context.bot.send_message(
chat_id=update.effective_chat.id,
text=f"{mal_id} isnt in your list ",
parse_mode=ParseMode.HTML)
else:
index = mal_ids.index(mal_id)
del mal_ids[0]
del statuses[0]
payload = {
"userID": user_id,
"MalList": mal_ids,
"state": statuses
}
database.update_document(
COLLECTIONID,
doc_id,
payload)
context.bot.send_message(
chat_id=update.effective_chat.id,
text=f'Deleted {mal_id} from list ')
except ValueError:
context.bot.send_message(
chat_id=update.effective_chat.id,
text='Usage : /delete {id} ')
Specifically at this part where we delete the document if the item is the last in the list , we pass it the collection id and the doc_id and then send a confirmation message to the user
if last_items and correct_item:
# delete the whole document as its the last item
database.delete_document(COLLECTIONID, doc_id)
context.bot.send_message(
chat_id=update.effective_chat.id,
text=f"Removed {mal_id} From your list , List is now empty .",
parse_mode=ParseMode.HTML)
We use database.list_documents in the list_list function to generate a pretty formatted list like this
docs = database.list_documents(
COLLECTIONID,
filters=[f'userID={user_id}'])
if len(docs['documents']) == 0:
context.bot.send_message(
chat_id=update.effective_chat.id,
text="No items in your list , start adding them via /anime.",
parse_mode=ParseMode.HTML)
else:
docs = docs['documents'][0] # there should only be one doc
print(docs)
mal_ids = docs['MalList']
statuses = docs['state']
anime_list = '<i><b> Your list : </b></i>\n\n'
i = 0
for id in mal_ids:
anime = jikan.anime(int(id))
anime_list = anime_list + f'[ <code>{anime["mal_id"]}</code> ] '
anime_list = anime_list + f'<b>{anime["title"]}</b>\n'
anime_list = anime_list + f'State : {statuses[i]}\n\n'
anime_list = anime_list + f'Status : {anime["status"]}\n'
anime_list = anime_list + f'Episodes : {anime["episodes"]}\n'
genres = [genre['name'] for genre in anime['genres']]
genre_string = ""
for genre in genres:
genre_string = genre_string + f" {genre} ,"
anime_list = anime_list + f'Genres : {genre_string[0:-1]}\n'
anime_list = anime_list + '\n'
print(anime)
i += 1
context.bot.send_message(
chat_id=update.effective_chat.id,
text=anime_list,
parse_mode=ParseMode.HTML)
We basically iterate over all documents returned by
docs = database.list_documents(
COLLECTIONID,
filters=[f'userID={user_id}'])
Then for each one we fecth that anime from the my anime list api using jikanPy then we do a bit of templates and string manipulation to get the nice list.
for id in mal_ids:
anime = jikan.anime(int(id))
anime_list = anime_list + f'[ <code>{anime["mal_id"]}</code> ] '
anime_list = anime_list + f'<b>{anime["title"]}</b>\n'
anime_list = anime_list + f'State : {statuses[i]}\n\n'
anime_list = anime_list + f'Status : {anime["status"]}\n'
anime_list = anime_list + f'Episodes : {anime["episodes"]}\n'
genres = [genre['name'] for genre in anime['genres']]
genre_string = ""
for genre in genres:
genre_string = genre_string + f" {genre} ,"
anime_list = anime_list + f'Genres : {genre_string[0:-1]}\n'
anime_list = anime_list + '\n'
print(anime)
i += 1
context.bot.send_message(
chat_id=update.effective_chat.id,
text=anime_list,
parse_mode=ParseMode.HTML)
Hit me up on github or telegram @robi_mez if you have any suggestions , comments or questions.
I would also like feedback on this blog as its my first time writing one <3.
25