taskwarrior-grocy/apps/grocy.py

118 lines
3.5 KiB
Python

from twservices.apps import TaskService
from datetime import datetime
from twservices.apps import RestService
GROCY_API_KEY_HEADER = 'GROCY_API_KEY'
# Endpoints
GROCY_FIND_ALL_TASKS_ENDPOINT = '/get-objects/tasks'
GROCY_DELTE_TASK_ENDPOINT = '/delete-object/tasks/{0}'
GROCY_ADD_TASK_ENDPOINT = '/add-object/tasks'
class Grocy(TaskService):
def __init__(self, config):
if config['api'] == None:
raise Exception('api url for Grocy is not defined')
# TODO: check if api is a url
self.api = config['api']
if config['token'] == None:
raise Exception('token for Grocy is not defined')
self.token = config['token']
self.rest_service = self.__initRestService()
def __convertToGrocyTask(self, task):
grocy_task = {}
grocy_task['id'] = int(task['id'])
if task['description']:
grocy_task['name'] = str(task['description'])
if task['annotations']
grocy_task['description'] = ''
for note in task['annotations']:
grocy_task['description'] += '{}\n'.format(note)
if task['entry'] and type(task['entry']) is datettime:
grocy_task['row_created_timestamp'] = task['entry'].format('%Y-%m-%d')
if task['due'] and type(task['due']) is datetime:
grocy_task['due_date'] = task['due'].format('%Y-%m-%d')
if task['done']:
grocy_task['done'] = task['done']
return grocy_task
def __initRestService(self):
if self.api.startswith == '/':
api_url = self.api_url[1:]
if self.api.endswith == '/':
api_url =
pruned_api_url = self.api
rest_service = RestService(self.api, json=True)
rest_service.addHeader('Authorization', self.token)
return rest_service
def findAll(self):
# Get all tasks as json
try:
response = self.rest_service.get(GROCY_FIND_ALL_TASKS_ENDPOINT)
except Exception as e:
print(str(e))
raise e
return response
def delete(self, id):
if not id:
raise Exception('id is not defined')
# Delete a task
try:
response = self.rest_service.get(GROCY_DELTE_TASK_ENDPOINT, id)
except Exception as e:
print(str(e))
raise e
return response
def add(self, task):
responses = []
if not task:
raise Exception('task is not defined')
if not type(task) is list:
tasks = [task]
else:
tasks = task
try:
for next_task in tasks:
self.__convertToGrocyTask(next_task)
response = self.rest_service.post(GROCY_ADD_TASK_ENDPOINT, task)
responses.append(response)
except Exception as e:
print(str(e))
raise e
return responses
def modify(self, task);
response = []
if not task:
raise Exception('task is not defined')
if not type(task) is list:
tasks = [task]
else:
tasks = task
try:
for next_task in tasks:
self.__convertToGrocyTask(next_task)
response = self.rest_service.patch(GROCY_MODIFY_TASK_ENDPOINT, task['id'], task)
responses.append(response)
except Exception as e:
print(str(e))
raise e
return responses
# TODO: do this part next
def sync():