from taskw.exceptions import TaskwarriorError from twservices.apps import TaskService from datetime import datetime from twservices.apps import RestService import logging logger = logging.getLogger(__name__) class Grocy(TaskService): API_KEY_HEADER = 'GROCY-API-KEY' # Endpoints FIND_ALL_TASKS_ENDPOINT = '/get-objects/tasks' DELETE_TASK_ENDPOINT = '/delete-object/tasks/{0}' ADD_TASK_ENDPOINT = '/add-object/tasks' MODIFY_TASK_ENDPOINT = '/edit-object/tasks' # UDA GROCY_ID = 'grocy_id' UDAS = { GROCY_ID: { 'type': 'string', 'label': 'Grocy Task ID' } } def __init__(self, **entries): super().__init__(**entries) self.tw = None self.rest_service = self.__initRestService() def get_udas(self): return { 'uda': Grocy.UDAS } def set_taskwarrior(self, tw): self.tw = tw def get_grocy_tw_tasks(self): if not self.tw: raise TaskwarriorError('taskwarrior instance not defined') filters = { 'and': [('%s.any' % key, None) for key in list(Grocy.UDAS.keys())], 'or': [('status', 'pending'), ('status', 'waiting')] } tasks = self.tw.filter_tasks(filters) return tasks def to_grocy(self, task): print('before the dawn') print(task) grocy_task = {} if Grocy.GROCY_ID in task: grocy_task['id'] = task[Grocy.GROCY_ID] if 'description' in task: grocy_task['name'] = task['description'] if 'annotations' in task: grocy_task['description'] = '' for note in task['annotations']: grocy_task['description'] += '{}\n'.format(note) if 'entry' in task and type(task['entry']) is datetime: grocy_task['row_created_timestamp'] = task['entry'].strftime('%Y-%m-%d %H:%M:%S') if 'due' in task and type(task['due']) is datetime: grocy_task['due_date'] = task['due'].strftime('%Y-%m-%d') if 'done' in task: grocy_task['done'] = task['done'] return grocy_task def to_taskwarrior(self, grocy_task): taskwarrior_task = {} if 'id' in grocy_task: taskwarrior_task[Grocy.GROCY_ID] = grocy_task['id'] if 'name' in grocy_task: taskwarrior_task['description'] = grocy_task['name'] if 'description' in grocy_task: taskwarrior_task['annotations'] = grocy_task['description'].split('\n') if 'row_created_timestamp' in grocy_task: taskwarrior_task['entry'] = grocy_task['row_created_timestamp'] if 'due_date' in grocy_task: taskwarrior_task['due'] = grocy_task['due_date'] if 'done' in grocy_task and grocy_task['done'] == '1': taskwarrior_task['status'] = 'completed' return taskwarrior_task def __initRestService(self): if self.api.startswith == '/': self.api = self.api[1:] if self.api.endswith == '/': self.api = self.api[0:-1] rest_service = RestService(self.api, json=True) rest_service.addHeader(Grocy.API_KEY_HEADER, self.token) return rest_service def findAll(self): if not self.rest_service: raise Exception('rest service for grocy is not defined') # Get all tasks as json try: response = self.rest_service.get(Grocy.FIND_ALL_TASKS_ENDPOINT) except BaseException as e: logger.error(e) raise e return response def delete(self, id): if not self.rest_service: raise Exception('rest service for grocy is not defined') if not id: raise Exception('id is not defined') # Delete a task try: response = self.rest_service.get(Grocy.DELETE_TASK_ENDPOINT, id) except Exception as e: logger.error(e) raise e return response def add(self, task): if not self.rest_service: raise Exception('rest service for grocy is not defined') responses = [] if not task: raise Exception('task is not defined') if not type(task) is list: tasks = [task] else: tasks = task try: for next_task in tasks: self.to_grocy(next_task) response = self.rest_service.post(Grocy.ADD_TASK_ENDPOINT, task) responses.append(response) except Exception as e: logger.error(e) raise e return responses def modify(self, task): if not self.rest_service: raise Exception('rest service for grocy is not defined') response = [] if not task: raise Exception('task is not defined') if not type(task) is list: tasks = [task] else: tasks = task try: for next_task in tasks: modify_endpoint = '{0}/{1}'.format(Grocy.MODIFY_TASK_ENDPOINT, next_task['id']) response = self.rest_service.post(modify_endpoint, next_task) except Exception as e: logger.exception('Could not send post to modify grocy task %s', e.stderr) def _should_merge(self, converted_tw_to_grocy_task, grocy_task): return True if converted_tw_to_grocy_task['id'] == grocy_task['id'] else False def merge_from_tw(self, taskwarrior_tasks, grocy_tasks): # clone taskwarrior_tasks for processing remaining_grocy_tasks_to_process = grocy_tasks[:] # Convert to taskwarrior tasks into grocy tasks converted_tw_to_grocy_tasks = [] modified_grocy_tasks = [] tasks_to_add_to_grocy = [] converted_tw_to_grocy_tasks = [self.to_grocy(task) for task in taskwarrior_tasks] # Sort grocy tasks by id and converted taskwarrior tasks by GROCY_ID grocy_tasks.sort(key=lambda x: x['id']) converted_tw_to_grocy_tasks.sort(key=lambda x: x['id']) # If no taskwarrior tasks add grocy tasks to taskwarrior if len(converted_tw_to_grocy_tasks) == 0: convert_grocy_tasks_to_tw = [self.to_taskwarrior(grocy_task) for grocy_task in grocy_tasks] for task in convert_grocy_tasks_to_tw: try: added_task = self.tw.task_add(**task) logger.debug('Added grocy task %s to taskwarrior', task) except TaskwarriorError as e: logger.exception('Unable to add task from grocy: %s', e.stderr) # Iterate through grocy tasks to find any conflicts for index in range(len(converted_tw_to_grocy_tasks)): finished_merge_process = index > len(remaining_grocy_tasks_to_process) - 1 # Add taskwarrior task to grocy if no more grocy tasks to process if finished_merge_process: tasks_to_add_to_grocy.append(converted_tw_to_grocy_task) logger.debug('Added grocy task %s to taskwarrior task', grocy_task['id']) continue grocy_task = grocy_tasks[index] converted_tw_to_grocy_task = converted_tw_to_grocy_tasks[index] # Merge from grocy into taskwarrior if match found # Delete taskwarrior task if self._should_merge(converted_tw_to_grocy_task, grocy_task): modified_grocy_tasks.append(converted_tw_to_grocy_task) logger.debug('Merged taskwarrior task %s to grocy task %s', converted_tw_to_grocy_task['id'], grocy_task['id']) del remaining_grocy_tasks_to_process[index] # Add taskwarrior task into taskwarrior else: tasks_to_add_to_grocy.append(converted_tw_to_grocy_task) logger.debug('Added grocy task %s to taskwarrior task', grocy_task['id']) # Add any remaining taskwarrior tasks not found in grocy into grocy for index in range(len(converted_tw_to_grocy_tasks)): tw_task = self.to_taskwarrior(converted_tw_to_grocy_tasks[index]) try: self.tw.task_add(**tw_task) except TaskwarriorError as e: logger.exception('Could not add task %s', e.stderr) # Send requests to Grocy service if len(modified_grocy_tasks) > 0: self.modify(modified_grocy_tasks) if len(tasks_to_add_to_grocy) > 0: self.add(tasks_to_add_to_grocy) def merge_into_tw(self, taskwarrior_tasks, grocy_tasks): print('here') # Convert to taskwarrior tasks into grocy tasks converted_tw_to_grocy_tasks = [] converted_tw_to_grocy_tasks = [self.to_grocy(task) for task in taskwarrior_tasks] # Sort grocy tasks by id and converted taskwarrior tasks by GROCY_ID grocy_tasks.sort(key=lambda x: x['id']) converted_tw_to_grocy_tasks.sort(key=lambda x: x[Grocy.GROCY_ID]) if len(grocy_tasks) == 0: self.add(converted_tw_to_grocy_tasks) # Iterate through grocy tasks to find any conflicts for index in range(len(grocy_tasks)): grocy_task = grocy_tasks[index] converted_tw_to_grocy_task = converted_tw_to_grocy_tasks[index] current_index_exceeded_tw_list = index > len(converted_tw_to_grocy_tasks) - 1 # Add if no more taskwarrior tasks to process if current_index_exceeded_tw_list: try: self.tw.task_add(converted_tw_to_grocy_task) logger.debug('Added grocy task %s to taskwarrior task', grocy_task['id']) except TaskwarriorError as e: logger.exception('Could not add task: %s', e.stderr) continue # Merge from grocy into taskwarrior if match found # Delete taskwarrior task if self.should_merge(converted_tw_to_grocy_task, grocy_task): try: converted_grocy_task_to_tw = self.to_taskwarrior(grocy_task) self.tw.update(converted_grocy_task_to_tw) logger.debug('Merged grocy task %s to taskwarrior task %s', grocy_task['id'], converted_grocy_task_to_tw['uuid']) del converted_tw_to_grocy_tasks[index] except TaskwarriorError as e: logger.exception('Could not update task: %s', e.stderr) # Add grocy task into taskwarrior else: try: self.tw.task_add(converted_grocy_task_to_tw) logger.debug('Added grocy task %s to taskwarrior task', grocy_task['id']) except TaskwarriorError as e: logger.exception('Could not add task: %s', e.stderr) # Add any remaining taskwarrior tasks not found in grocy into grocy for index in range(len(converted_tw_to_grocy_tasks)): tw_task = self.to_taskwarrior(converted_tw_to_grocy_tasks[index]) self.tw.task_add(tw_task) def sync(self): # Get all tasks from taskwarrior # Get all tasks from grocy # Push to taskwarrior taskwarrior_tasks = self.get_grocy_tw_tasks() try: grocy_tasks = self.findAll() except BaseException as e: logger.error(e) logger.error('Could not get all grocy tasks') try: if self.resolution == 'tw': self.merge_from_tw(taskwarrior_tasks, grocy_tasks) else: self.merge_into_tw(taskwarrior_tasks, grocy_tasks) except TaskwarriorError as e: logger.exception('Could not sync tasks %s', e.stderr)