from taskw.exceptions import TaskwarriorError from twservices.apps import TaskService from datetime import datetime from twservices.apps import RestService import logging logger = logging.getLogger(__name__) class Grocy(TaskService): API_KEY_HEADER = 'GROCY-API-KEY' # PROPS ID = 'id' DONE = 'done' DESCRIPTION = 'description' ASSIGNED_TO_USER_ID = 'assigned_to_user_id' ROW_CREATED_TIMESTAMP = 'row_created_timestamp' CATEGORY = 'category' DUE_DATE = 'due_date' NAME = 'name' # Endpoints FIND_ALL_TASKS_ENDPOINT = '/get-objects/tasks' FIND_ALL_CATEGORIES_ENDPOINT = '/get-objects/task_categories' DELETE_TASK_ENDPOINT = '/delete-object/tasks/{0}' ADD_CATEGORY_ENDPOINT = '/add-object/task_categories' ADD_TASK_ENDPOINT = '/add-object/tasks' MODIFY_TASK_ENDPOINT = '/edit-object/tasks' # UDA UDA_GROCY_ID = 'grocy_id' UDA_GROCY_TW_UUID = '_uuid' UDA_GROCY_ASSIGNED_TO_USER_ID = 'grocy_assigned_to' UDA_GROCY_CATEGORY_ID = 'grocy_category_id' UDA_GROCY_CATEGORY_NAME = 'grocy_category_name' UDAS = { UDA_GROCY_ID: { 'type': 'string', 'label': 'Grocy Task ID' }, UDA_GROCY_ASSIGNED_TO_USER_ID: { 'type': 'string', 'label': 'Grocy Assigned User ID' }, UDA_GROCY_CATEGORY_ID: { 'type': 'string', 'label': 'Grocy Category ID' } } def __init__(self, **entries): super().__init__(**entries) self.tw = None self.rest_service = self.__initRestService() def get_udas(self): return { 'uda': Grocy.UDAS } def set_taskwarrior(self, tw): self.tw = tw def get_grocy_tw_tasks(self): if not self.tw: raise TaskwarriorError('taskwarrior instance not defined') filters = { 'and': [('%s.any' % key, None) for key in list(Grocy.UDAS.keys())], 'or': [('status', 'pending'), ('status', 'waiting')] } tasks = self.tw.filter_tasks(filters) return tasks def get_category(self): if not self.rest_service: raise Exception('rest service for grocy is not defined') # Find specified category from all categories try: responses = self.rest_service.get(Grocy.FIND_ALL_CATEGORIES_ENDPOINT) except BaseException as e: logger.error(e) raise e if responses: for category in responses: if category['name'] == self.category: return category logger.warning('Could not find category {} in list'.format(self.category)) return None def create_category(self): if not self.rest_service: raise Exception('rest service for grocy is not defined') # Add new category to list try: response = self.rest_service.post(Grocy.ADD_CATEGORY_ENDPOINT, self.category) if response and response['success']: return elif response and not response['success']: raise Exception(response['error']) else: raise Exception('Grocy service unavailable Service') except Exception as e: logger.error(e) raise e def to_grocy(self, task): grocy_task = {} if Grocy.UDA_GROCY_ID in task: grocy_task[Grocy.ID] = task[Grocy.UDA_GROCY_ID] if TaskService.UUID in task: grocy_task[Grocy.UDA_GROCY_TW_UUID] = task[TaskService.UUID] if Grocy.UDA_GROCY_ASSIGNED_TO_USER_ID in task: grocy_task[Grocy.ASSIGNED_TO_USER_ID] = task[Grocy.UDA_GROCY_ASSIGNED_TO_USER_ID] if TaskService.DESCRIPTION in task: grocy_task[Grocy.NAME] = task[TaskService.DESCRIPTION] if TaskService.TAGS in task and self.category in task[TaskService.TAGS]: category = self.get_category() category_does_not_exist = not category if category_does_not_exist: category = self.create_category() elif category and 'name' in category: grocy_task[Grocy.CATEGORY] = category['name'] else: logger.error('Grocy category does not have a name property or is undefined') raise Exception('Grocy category does not have a name property or is undefined') if TaskService.ANNOTATIONS in task: grocy_task[Grocy.DESCRIPTION] = '' for note in task[TaskService.ANNOTATIONS]: grocy_task[Grocy.DESCRIPTION] += '{}\n'.format(note) if TaskService.ENTRY in task and type(task[TaskService.ENTRY]) is datetime: grocy_task[Grocy.ROW_CREATED_TIMESTAMP] = task[TaskService.ENTRY].strftime('%Y-%m-%d %H:%M:%S') if TaskService.DUE in task and type(task[TaskService.DUE]) is datetime: grocy_task[Grocy.DUE_DATE] = task[TaskService.DUE].strftime('%Y-%m-%d') if TaskService.DONE in task: grocy_task[Grocy.DONE] = task[TaskService.DONE] return grocy_task def to_taskwarrior(self, grocy_task): taskwarrior_task = {} if Grocy.ID in grocy_task: taskwarrior_task[Grocy.UDA_GROCY_ID] = grocy_task[Grocy.ID] if Grocy.UDA_GROCY_TW_UUID in grocy_task: taskwarrior_task[TaskService.UUID] = grocy_task[Grocy.UDA_GROCY_TW_UUID] if Grocy.ASSIGNED_TO_USER_ID in grocy_task: taskwarrior_task[Grocy.UDA_GROCY_ASSIGNED_TO_USER_ID] = grocy_task[Grocy.ASSIGNED_TO_USER_ID] if Grocy.NAME in grocy_task: taskwarrior_task[TaskService.DESCRIPTION] = grocy_task[Grocy.NAME] if Grocy.DESCRIPTION in grocy_task: if '\r' in grocy_task[Grocy.DESCRIPTION]: grocy_task[Grocy.DESCRIPTION] = grocy_task[Grocy.DESCRIPTION].replace('\r', '') taskwarrior_task[TaskService.ANNOTATIONS] = grocy_task[Grocy.DESCRIPTION].split('\n') last_element_is_empty_string = taskwarrior_task[TaskService.ANNOTATIONS][-1] == '' if last_element_is_empty_string: taskwarrior_task[TaskService.ANNOTATIONS] = taskwarrior_task[TaskService.ANNOTATIONS][0:-1] # Remove empty string in array if Grocy.ROW_CREATED_TIMESTAMP in grocy_task: taskwarrior_task[TaskService.ENTRY] = grocy_task[Grocy.ROW_CREATED_TIMESTAMP] if Grocy.DUE_DATE in grocy_task: taskwarrior_task[TaskService.DUE] = grocy_task[Grocy.DUE_DATE] if Grocy.DONE in grocy_task and grocy_task[Grocy.DONE] == '1': taskwarrior_task[TaskService.STATUS] = TaskService.COMPLETED if Grocy.CATEGORY in grocy_task: category = self.get_category() if category: taskwarrior_task[TaskService.TAGS] = [ category['name'] ] return taskwarrior_task def __initRestService(self): if self.api.startswith == '/': self.api = self.api[1:] if self.api.endswith == '/': self.api = self.api[1:-1] rest_service = RestService(self.api, json=True) rest_service.addHeader(Grocy.API_KEY_HEADER, self.token) return rest_service def find_all(self): if not self.rest_service: raise Exception('rest service for grocy is not defined') # Get all tasks as json try: responses = self.rest_service.get(Grocy.FIND_ALL_TASKS_ENDPOINT) except BaseException as e: logger.error(e) raise e return responses def delete(self, id): if not self.rest_service: raise Exception('rest service for grocy is not defined') if not id: raise Exception('id is not defined') # Delete a task try: response = self.rest_service.get(Grocy.DELETE_TASK_ENDPOINT, id) if response and response['success']: return response elif response and not response['success']: raise Exception(response['error']) else: raise Exception('Grocy service unavailable Service') except Exception as e: logger.error(e) raise e def add(self, grocy_task): if not self.rest_service: raise Exception('rest service for grocy is not defined') responses = [] if not grocy_task: raise Exception('grocy_task is not defined') if not type(grocy_task) is list: grocy_tasks = [grocy_task] else: grocy_tasks = grocy_task try: for next_grocy_task in grocy_tasks: response = self.rest_service.post(Grocy.ADD_TASK_ENDPOINT, next_grocy_task) responses.append(response) except Exception as e: logger.error(e) raise e return responses def modify(self, grocy_task): if not self.rest_service: raise Exception('rest service for grocy is not defined') responses = [] if not grocy_task: raise Exception('grocy_task is not defined') if not type(grocy_task) is list: grocy_tasks = [grocy_task] else: grocy_tasks = grocy_task try: for grocy_next_task in grocy_tasks: modify_endpoint = '{0}/{1}'.format(Grocy.MODIFY_TASK_ENDPOINT, grocy_next_task['id']) response = self.rest_service.post(modify_endpoint, grocy_next_task) responses.append(response) except Exception as e: logger.exception('Could not send post to modify grocy grocy_task %s', e.stderr) raise e return responses def _should_merge(self, converted_tw_to_grocy_task, grocy_task): return True if converted_tw_to_grocy_task['id'] == grocy_task['id'] else False def merge_from_tw(self, taskwarrior_tasks, grocy_tasks): # clone taskwarrior_tasks for processing remaining_grocy_tasks_to_process = grocy_tasks[:] # Convert to taskwarrior tasks into grocy tasks converted_tw_to_grocy_tasks = [] modified_grocy_tasks = [] tasks_to_add_to_grocy = [] converted_tw_to_grocy_tasks = [self.to_grocy(task) for task in taskwarrior_tasks] # Sort grocy tasks by id and converted taskwarrior tasks by UDA_GROCY_ID grocy_tasks.sort(key=lambda x: x['id']) converted_tw_to_grocy_tasks.sort(key=lambda x: x['id']) # If no taskwarrior tasks add grocy tasks to taskwarrior if len(converted_tw_to_grocy_tasks) == 0: convert_grocy_tasks_to_tw = [self.to_taskwarrior(grocy_task) for grocy_task in grocy_tasks] for task in convert_grocy_tasks_to_tw: try: added_task = self.tw.task_add(**task) logger.debug('Added grocy task %s to taskwarrior', task) except TaskwarriorError as e: logger.exception('Unable to add task from grocy: %s', e.stderr) # Iterate through grocy tasks to find any conflicts for index in range(len(converted_tw_to_grocy_tasks)): finished_merge_process = index > len(remaining_grocy_tasks_to_process) - 1 # Add taskwarrior task to grocy if no more grocy tasks to process if finished_merge_process: tasks_to_add_to_grocy.append(converted_tw_to_grocy_task) logger.debug('Added grocy task %s to taskwarrior task', grocy_task['id']) continue grocy_task = grocy_tasks[index] converted_tw_to_grocy_task = converted_tw_to_grocy_tasks[index] # Merge from grocy into taskwarrior if match found # Delete taskwarrior task if self._should_merge(converted_tw_to_grocy_task, grocy_task): modified_grocy_tasks.append(converted_tw_to_grocy_task) logger.debug('Merged taskwarrior task %s to grocy task %s', converted_tw_to_grocy_task['id'], grocy_task['id']) del remaining_grocy_tasks_to_process[index] # Add taskwarrior task into taskwarrior else: tasks_to_add_to_grocy.append(converted_tw_to_grocy_task) logger.debug('Added grocy task %s to taskwarrior task', grocy_task['id']) # Send requests to Grocy service if len(modified_grocy_tasks) > 0: modify_responses = self.modify(modified_grocy_tasks) logger.debug(modify_responses) if len(tasks_to_add_to_grocy) > 0: added_responses = self.add(tasks_to_add_to_grocy) logger.debug(added_responses) def merge_into_tw(self, taskwarrior_tasks, grocy_tasks): # Convert to taskwarrior tasks into grocy tasks converted_tw_to_grocy_tasks = [] converted_tw_to_grocy_tasks = [self.to_grocy(task) for task in taskwarrior_tasks] # Sort grocy tasks by id and converted taskwarrior tasks by UDA_GROCY_ID grocy_tasks.sort(key=lambda x: x['id']) converted_tw_to_grocy_tasks.sort(key=lambda x: x['id']) # If no taskwarrior tasks add grocy tasks to taskwarrior if len(converted_tw_to_grocy_tasks) == 0: convert_grocy_tasks_to_tw = [self.to_taskwarrior(grocy_task) for grocy_task in grocy_tasks] for task in convert_grocy_tasks_to_tw: try: added_task = self.tw.task_add(**task) logger.debug('Added grocy task %s to taskwarrior', task) except TaskwarriorError as e: logger.exception('Unable to add task from grocy: %s', e.stderr) return if len(grocy_tasks) == 0: self.add(converted_tw_to_grocy_tasks) # Iterate through grocy tasks to find any conflicts for index in range(len(grocy_tasks)): grocy_task = grocy_tasks[index] converted_tw_to_grocy_task = converted_tw_to_grocy_tasks[index] current_index_exceeded_tw_list = index > len(converted_tw_to_grocy_tasks) - 1 # Add if no more taskwarrior tasks to process if current_index_exceeded_tw_list: try: self.tw.task_add(converted_tw_to_grocy_task) logger.debug('Added grocy task %s to taskwarrior task', grocy_task['id']) except TaskwarriorError as e: logger.exception('Could not add task: %s', e.stderr) continue # Merge from grocy into taskwarrior if match found # Delete taskwarrior task if self._should_merge(converted_tw_to_grocy_task, grocy_task): try: converted_grocy_task_to_tw = self.to_taskwarrior(grocy_task) converted_grocy_task_to_tw['uuid'] = converted_tw_to_grocy_task[Grocy.UDA_GROCY_TW_UUID] self.tw.task_update(converted_grocy_task_to_tw) logger.debug('Merged grocy task %s to taskwarrior task %s', grocy_task['id'], converted_grocy_task_to_tw['uuid']) del converted_tw_to_grocy_tasks[index] except TaskwarriorError as e: logger.exception('Could not update task: %s', e.stderr) def sync(self): # Get all tasks from taskwarrior # Get all tasks from grocy # Push to taskwarrior taskwarrior_tasks = self.get_grocy_tw_tasks() try: grocy_tasks = self.find_all() except BaseException as e: logger.error(e) logger.error('Could not get all grocy tasks') try: if self.resolution == 'tw': grocy_tasks = self.find_all() self.merge_from_tw(taskwarrior_tasks, grocy_tasks) elif self.resolution == 'grocy': grocy_tasks = self.find_all() self.merge_into_tw(taskwarrior_tasks, grocy_tasks) else: raise Exception('Could not determine resolution for grocy') except TaskwarriorError as e: logger.exception('Could not sync tasks %s', e.stderr)