taskwarrior-grocy/twservices/apps/grocy.py

399 lines
15 KiB
Python

from taskw.exceptions import TaskwarriorError
from twservices.apps import TaskService
from datetime import datetime
from twservices.apps import RestService
import logging
logger = logging.getLogger(__name__)
class Grocy(TaskService):
API_KEY_HEADER = 'GROCY-API-KEY'
# PROPS
ID = 'id'
DONE = 'done'
ASSIGNED_TO_USER_ID = 'assigned_to_user_id'
ROW_CREATED_TIMESTAMP = 'row_created_timestamp'
CATEGORY = 'category'
DUE_DATE = 'due_date'
NAME = 'name'
# Endpoints
FIND_ALL_TASKS_ENDPOINT = '/get-objects/tasks'
FIND_ALL_CATEGORIES_ENDPOINT = '/get-objects/task_categories'
DELETE_TASK_ENDPOINT = '/delete-object/tasks/{0}'
ADD_CATEGORY_ENDPOINT = '/add-object/task_categories'
ADD_TASK_ENDPOINT = '/add-object/tasks'
MODIFY_TASK_ENDPOINT = '/edit-object/tasks'
# UDA
UDA_GROCY_ID = 'grocy_id'
UDA_GROCY_TW_UUID = '_uuid'
UDA_GROCY_ASSIGNED_TO_USER_ID = 'grocy_assigned_to'
UDA_GROCY_CATEGORY_ID = 'grocy_category_id'
UDA_GROCY_CATEGORY_NAME = 'grocy_category_name'
UDAS = {
UDA_GROCY_ID: {
'type': 'string',
'label': 'Grocy Task ID'
},
UDA_GROCY_ASSIGNED_TO_USER_ID: {
'type': 'string',
'label': 'Grocy Assigned User ID'
},
UDA_GROCY_CATEGORY_ID: {
'type': 'string',
'label': 'Grocy Category ID'
}
}
def __init__(self, **entries):
super().__init__(**entries)
self.tw = None
self.rest_service = self.__initRestService()
def get_udas(self):
return {
'uda': Grocy.UDAS
}
def set_taskwarrior(self, tw):
self.tw = tw
def get_grocy_tw_tasks(self):
if not self.tw:
raise TaskwarriorError('taskwarrior instance not defined')
filters = {
'and': [('%s.any' % key, None) for key in list(Grocy.UDAS.keys())],
'or': [('status', 'pending'), ('status', 'waiting')]
}
tasks = self.tw.filter_tasks(filters)
return tasks
def get_category(self):
if not self.rest_service:
raise Exception('rest service for grocy is not defined')
# Find specified category from all categories
try:
responses = self.rest_service.get(Grocy.FIND_ALL_CATEGORIES_ENDPOINT)
except BaseException as e:
logger.error(e)
raise e
if responses:
for category in responses:
if category['name'] == self.category:
return category
logger.warning('Could not find category {} in list'.format(self.category))
return None
def create_category(self):
if not self.rest_service:
raise Exception('rest service for grocy is not defined')
# Add new category to list
try:
response = self.rest_service.post(Grocy.ADD_CATEGORY_ENDPOINT, self.category)
if response and response['success']:
return
elif response and not response['success']:
raise Exception(response['error'])
else:
raise Exception('Grocy service unavailable Service')
except Exception as e:
logger.error(e)
raise e
def to_grocy(self, task):
grocy_task = {}
if Grocy.UDA_GROCY_ID in task:
grocy_task[Grocy.ID] = task[Grocy.UDA_GROCY_ID]
if TaskService.UUID in task:
grocy_task[Grocy.UDA_GROCY_TW_UUID] = task[TaskService.UUID]
if Grocy.UDA_GROCY_ASSIGNED_TO_USER_ID in task:
grocy_task[Grocy.ASSIGNED_TO_USER_ID] = task[Grocy.UDA_GROCY_ASSIGNED_TO_USER_ID]
if TaskService.DESCRIPTION in task:
grocy_task[Grocy.NAME] = task[TaskService.DESCRIPTION]
if TaskService.TAGS in task and self.category in task[TaskService.TAGS]:
category = self.get_category()
category_does_not_exist = not category
if category_does_not_exist:
category = self.create_category()
elif category and 'name' in category:
grocy_task[Grocy.CATEGORY] = category['name']
else:
logger.error('Grocy category does not have a name property or is undefined')
raise Exception('Grocy category does not have a name property or is undefined')
if TaskService.ANNOTATIONS in task:
grocy_task[Grocy.DESCRIPTION] = ''
for note in task[TaskService.ANNOTATIONS]:
grocy_task[Grocy.DESCRIPTION] += '{}\n'.format(note)
if TaskService.ENTRY in task and type(task[TaskService.ENTRY]) is datetime:
grocy_task[Grocy.ROW_CREATED_TIMESTAMP] = task[TaskService.ENTRY].strftime('%Y-%m-%d %H:%M:%S')
if TaskService.DUE in task and type(task[TaskService.DUE]) is datetime:
grocy_task[Grocy.DUE_DATE] = task[TaskService.DUE].strftime('%Y-%m-%d')
if TaskService.DONE in task:
grocy_task[Grocy.DONE] = task[TaskService.DONE]
return grocy_task
def to_taskwarrior(self, grocy_task):
taskwarrior_task = {}
if 'id' in grocy_task:
taskwarrior_task[Grocy.UDA_GROCY_ID] = grocy_task['id']
if Grocy.UDA_GROCY_TW_UUID in grocy_task:
taskwarrior_task['uuid'] = grocy_task[Grocy.UDA_GROCY_TW_UUID]
if Grocy.ASSIGNED_TO_USER_ID in grocy_task:
taskwarrior_task[Grocy.UDA_GROCY_ASSIGNED_TO_USER_ID] = grocy_task[Grocy.ASSIGNED_TO_USER_ID]
if 'name' in grocy_task:
taskwarrior_task['description'] = grocy_task['name']
if 'description' in grocy_task:
if '\r' in grocy_task['description']:
grocy_task['description'] = grocy_task['description'].replace('\r', '')
taskwarrior_task['annotations'] = grocy_task['description'].split('\n')
last_element_is_empty_string = taskwarrior_task['annotations'][-1] == ''
if last_element_is_empty_string:
taskwarrior_task['annotations'] = taskwarrior_task['annotations'][0:-1] # Remove empty string in array
if 'row_created_timestamp' in grocy_task:
taskwarrior_task['entry'] = grocy_task['row_created_timestamp']
if 'due_date' in grocy_task:
taskwarrior_task['due'] = grocy_task['due_date']
if 'done' in grocy_task and grocy_task['done'] == '1':
taskwarrior_task['status'] = 'completed'
if 'category_id' in grocy_task:
category = self.get_category()
print('category {}'.format(category))
if category:
taskwarrior_task['tags'] = [ category['name'] ]
return taskwarrior_task
def __initRestService(self):
if self.api.startswith == '/':
self.api = self.api[1:]
if self.api.endswith == '/':
self.api = self.api[1:-1]
rest_service = RestService(self.api, json=True)
rest_service.addHeader(Grocy.API_KEY_HEADER, self.token)
return rest_service
def findAll(self):
if not self.rest_service:
raise Exception('rest service for grocy is not defined')
# Get all tasks as json
try:
response = self.rest_service.get(Grocy.FIND_ALL_TASKS_ENDPOINT)
except BaseException as e:
logger.error(e)
raise e
return response
def delete(self, id):
if not self.rest_service:
raise Exception('rest service for grocy is not defined')
if not id:
raise Exception('id is not defined')
# Delete a task
try:
response = self.rest_service.get(Grocy.DELETE_TASK_ENDPOINT, id)
except Exception as e:
logger.error(e)
raise e
return response
def add(self, task):
if not self.rest_service:
raise Exception('rest service for grocy is not defined')
responses = []
if not task:
raise Exception('task is not defined')
if not type(task) is list:
tasks = [task]
else:
tasks = task
try:
for next_task in tasks:
self.to_grocy(next_task)
response = self.rest_service.post(Grocy.ADD_TASK_ENDPOINT, task)
responses.append(response)
except Exception as e:
logger.error(e)
raise e
return responses
def modify(self, task):
if not self.rest_service:
raise Exception('rest service for grocy is not defined')
response = []
if not task:
raise Exception('task is not defined')
if not type(task) is list:
tasks = [task]
else:
tasks = task
try:
for next_task in tasks:
modify_endpoint = '{0}/{1}'.format(Grocy.MODIFY_TASK_ENDPOINT, next_task['id'])
response = self.rest_service.post(modify_endpoint, next_task)
except Exception as e:
logger.exception('Could not send post to modify grocy task %s', e.stderr)
def _should_merge(self, converted_tw_to_grocy_task, grocy_task):
return True if converted_tw_to_grocy_task['id'] == grocy_task['id'] else False
def merge_from_tw(self, taskwarrior_tasks, grocy_tasks):
# clone taskwarrior_tasks for processing
remaining_grocy_tasks_to_process = grocy_tasks[:]
# Convert to taskwarrior tasks into grocy tasks
converted_tw_to_grocy_tasks = []
modified_grocy_tasks = []
tasks_to_add_to_grocy = []
converted_tw_to_grocy_tasks = [self.to_grocy(task) for task in taskwarrior_tasks]
# Sort grocy tasks by id and converted taskwarrior tasks by UDA_GROCY_ID
grocy_tasks.sort(key=lambda x: x['id'])
converted_tw_to_grocy_tasks.sort(key=lambda x: x['id'])
# If no taskwarrior tasks add grocy tasks to taskwarrior
if len(converted_tw_to_grocy_tasks) == 0:
convert_grocy_tasks_to_tw = [self.to_taskwarrior(grocy_task) for grocy_task in grocy_tasks]
for task in convert_grocy_tasks_to_tw:
try:
added_task = self.tw.task_add(**task)
logger.debug('Added grocy task %s to taskwarrior', task)
except TaskwarriorError as e:
logger.exception('Unable to add task from grocy: %s', e.stderr)
# Iterate through grocy tasks to find any conflicts
for index in range(len(converted_tw_to_grocy_tasks)):
finished_merge_process = index > len(remaining_grocy_tasks_to_process) - 1
# Add taskwarrior task to grocy if no more grocy tasks to process
if finished_merge_process:
tasks_to_add_to_grocy.append(converted_tw_to_grocy_task)
logger.debug('Added grocy task %s to taskwarrior task', grocy_task['id'])
continue
grocy_task = grocy_tasks[index]
converted_tw_to_grocy_task = converted_tw_to_grocy_tasks[index]
# Merge from grocy into taskwarrior if match found
# Delete taskwarrior task
if self._should_merge(converted_tw_to_grocy_task, grocy_task):
modified_grocy_tasks.append(converted_tw_to_grocy_task)
logger.debug('Merged taskwarrior task %s to grocy task %s', converted_tw_to_grocy_task['id'], grocy_task['id'])
del remaining_grocy_tasks_to_process[index]
# Add taskwarrior task into taskwarrior
else:
tasks_to_add_to_grocy.append(converted_tw_to_grocy_task)
logger.debug('Added grocy task %s to taskwarrior task', grocy_task['id'])
# Send requests to Grocy service
if len(modified_grocy_tasks) > 0:
modify_responses = self.modify(modified_grocy_tasks)
logger.debug(modify_responses)
if len(tasks_to_add_to_grocy) > 0:
added_responses = self.add(tasks_to_add_to_grocy)
logger.debug(added_responses)
def merge_into_tw(self, taskwarrior_tasks, grocy_tasks):
# Convert to taskwarrior tasks into grocy tasks
converted_tw_to_grocy_tasks = []
converted_tw_to_grocy_tasks = [self.to_grocy(task) for task in taskwarrior_tasks]
# Sort grocy tasks by id and converted taskwarrior tasks by UDA_GROCY_ID
grocy_tasks.sort(key=lambda x: x['id'])
converted_tw_to_grocy_tasks.sort(key=lambda x: x['id'])
# If no taskwarrior tasks add grocy tasks to taskwarrior
if len(converted_tw_to_grocy_tasks) == 0:
convert_grocy_tasks_to_tw = [self.to_taskwarrior(grocy_task) for grocy_task in grocy_tasks]
for task in convert_grocy_tasks_to_tw:
try:
added_task = self.tw.task_add(**task)
logger.debug('Added grocy task %s to taskwarrior', task)
except TaskwarriorError as e:
logger.exception('Unable to add task from grocy: %s', e.stderr)
return
if len(grocy_tasks) == 0:
self.add(converted_tw_to_grocy_tasks)
# Iterate through grocy tasks to find any conflicts
for index in range(len(grocy_tasks)):
grocy_task = grocy_tasks[index]
converted_tw_to_grocy_task = converted_tw_to_grocy_tasks[index]
current_index_exceeded_tw_list = index > len(converted_tw_to_grocy_tasks) - 1
# Add if no more taskwarrior tasks to process
if current_index_exceeded_tw_list:
try:
self.tw.task_add(converted_tw_to_grocy_task)
logger.debug('Added grocy task %s to taskwarrior task', grocy_task['id'])
except TaskwarriorError as e:
logger.exception('Could not add task: %s', e.stderr)
continue
# Merge from grocy into taskwarrior if match found
# Delete taskwarrior task
if self._should_merge(converted_tw_to_grocy_task, grocy_task):
try:
converted_grocy_task_to_tw = self.to_taskwarrior(grocy_task)
converted_grocy_task_to_tw['uuid'] = converted_tw_to_grocy_task[Grocy.UDA_GROCY_TW_UUID]
self.tw.task_update(converted_grocy_task_to_tw)
logger.debug('Merged grocy task %s to taskwarrior task %s', grocy_task['id'], converted_grocy_task_to_tw['uuid'])
del converted_tw_to_grocy_tasks[index]
except TaskwarriorError as e:
logger.exception('Could not update task: %s', e.stderr)
def sync(self):
# Get all tasks from taskwarrior
# Get all tasks from grocy
# Push to taskwarrior
taskwarrior_tasks = self.get_grocy_tw_tasks()
try:
grocy_tasks = self.findAll()
except BaseException as e:
logger.error(e)
logger.error('Could not get all grocy tasks')
try:
if self.resolution == 'tw':
grocy_tasks = self.findAll()
self.merge_from_tw(taskwarrior_tasks, grocy_tasks)
elif self.resolution == 'grocy':
grocy_tasks = self.findAll()
self.merge_into_tw(taskwarrior_tasks, grocy_tasks)
else:
raise Exception('Could not determine resolution for grocy')
except TaskwarriorError as e:
logger.exception('Could not sync tasks %s', e.stderr)