from twservices.apps import TaskService from sys import exit import twservices.Taskwarrior from datetime import datetime from twservices.apps import RestService import logging log = logging.getLogger(__name__) class Grocy(TaskService): GROCY_API_KEY_HEADER = 'GROCY_API_KEY' # Endpoints GROCY_FIND_ALL_TASKS_ENDPOINT = '/get-objects/tasks' GROCY_DELTE_TASK_ENDPOINT = '/delete-object/tasks/{0}' GROCY_ADD_TASK_ENDPOINT = '/add-object/tasks' # UDA GROCY_ID = 'grocy_id' UDAS = { GROCY_ID: { 'type': 'numeric', 'label': 'Grocy Task ID' } } def __init__(self, **entries): super(entries) self.tw = None def get_udas(): return UDAS def set_taskwarrior(tw): self.tw = tw def get_grocy_tw_tasks(): if(not self.tw): raise TaskwarriorError('taskwarrior instance not defined') return tw.filter_tasks({ 'and': [('%s.any' % key, None) for key in list(UDA.keys())], 'or': [ ('status', 'pending'), ('status', 'waiting') ] }) def to_grocy(self, task): grocy_task = {} if GROCY_ID in task grocy_task['id'] = task[GROCY_ID] if 'description' in task: grocy_task['name'] = task['description'] if 'annotations' in task: grocy_task['description'] = '' for note in task['annotations']: grocy_task['description'] += '{}\n'.format(note) if 'entry' in task and type(task['entry']) is datettime: grocy_task['row_created_timestamp'] = task['entry'].format('%Y-%m-%d') if 'due' in task and type(task['due']) is datetime: grocy_task['due_date'] = task['due'].format('%Y-%m-%d') if 'done' in task: grocy_task['done'] = task['done'] return grocy_task def __initRestService(self): if self.api.startswith == '/': api_url = self.api_url[1:] if self.api.endswith == '/': api_url = pruned_api_url = self.api rest_service = RestService(self.api, json=True) rest_service.addHeader('Authorization', self.token) return rest_service def findAll(self): # Get all tasks as json try: response = self.rest_service.get(GROCY_FIND_ALL_TASKS_ENDPOINT) except Exception as e: print(str(e)) raise e return response def delete(self, id): if not id: raise Exception('id is not defined') # Delete a task try: response = self.rest_service.get(GROCY_DELTE_TASK_ENDPOINT, id) except Exception as e: print(str(e)) raise e return response def add(self, task): responses = [] if not task: raise Exception('task is not defined') if not type(task) is list: tasks = [task] else: tasks = task try: for next_task in tasks: self.to_grocy(next_task) response = self.rest_service.post(GROCY_ADD_TASK_ENDPOINT, task) responses.append(response) except Exception as e: print(str(e)) raise e return responses def modify(self, task); response = [] if not task: raise Exception('task is not defined') if not type(task) is list: tasks = [task] else: tasks = task try: for next_task in tasks: self.to_grocy(next_task) response = self.rest_service.patch(GROCY_MODIFY_TASK_ENDPOINT, task['id'], task) responses.append(response) except Exception as e: print(str(e)) raise e return responses def sort_by_grocy_id(task_a, task_b): if 'GROCY_ID' in task_a and 'GROCY_ID' in task_b: return task_a['GROCY_ID'] - task_b['GROCY_ID'] elif 'id' in task_a and 'id' in task_b: return task_a['id'] - task_b['id'] else: return 0 def merge_from_tw(self, taskwarrior_tasks, grocy_tasks): # clone taskwarrior_tasks for processing remaining_grocy_tasks_to_process = grocy_tasks[:] # Convert to taskwarrior tasks into grocy tasks converted_tw_to_grocy_tasks = [] modified_grocy_tasks = [] for task in taskwarrior_tasks: converted_tw_to_grocy_tasks.append(self.to_grocy(task)) # Sort grocy tasks and converted taskwarrior tasks by GROCY_ID converted_tw_to_grocy_tasks.sort(self.sort_by_grocy_id) grocy_tasks.sort(self.sort_by_grocy_id) # Iterate through grocy tasks to find any conflicts for index in range(len(converted_tw_to_grocy_tasks)): grocy_task = grocy_tasks[index] converted_tw_to_grocy_task = converted_tw_to_grocy_tasks[index] finished_merge_process = index > len(remaining_grocy_tasks_to_process) - 1 # Add taskwarrior task to grocy if no more grocy tasks to process if finished_merge_process: tasks_to_add_to_grocy.append(converted_tw_to_grocy_task) log.debug('Added grocy task %s to taskwarrior task', grocy_task['id']) continue # Merge from grocy into taskwarrior if match found # Delete taskwarrior task if self.should_merge(converted_tw_to_grocy_task, grocy_task): try: modified_grocy_tasks.append(converted_tw_to_grocy_task) log.debug('Merged taskwarrior task %s to grocy task %s', converted_tw_to_grocy_task['uuid'], grocy_task['id']) del remaining_grocy_tasks_to_process[index] except TaskwarriorError as e: log.exception('Could not update task: %s', % e.stderr) ## Add taskwarrior task into taskwarrior else: tasks_to_add_to_grocy(converted_tw_to_grocy_task) log.debug('Added grocy task %s to taskwarrior task', grocy_task['id']) # Add any remaining taskwarrior tasks not found in grocy into grocy for index in range(len(converted_tw_to_grocy_tasks)): tw_task = self.to_taskwarrior(converted_tw_to_grocy_tasks[index]) tw.task_add(tw_task) # Send requests to Grocy service self.modify(modified_grocy_tasks) self.add(tasks_to_add_to_grocy) def merge_into_tw(self, taskwarrior_tasks, grocy_tasks): # Convert to taskwarrior tasks into grocy tasks converted_tw_to_grocy_tasks = [] for task in taskwarrior_tasks: converted_tw_to_grocy_tasks.append(self.to_grocy(task)) # Sort grocy tasks and converted taskwarrior tasks by GROCY_ID converted_tw_to_grocy_tasks.sort(self.sort_by_grocy_id) grocy_tasks.sort(self.sort_by_grocy_id) # Iterate through grocy tasks to find any conflicts for index in range(len(grocy_tasks)): grocy_task = grocy_tasks[index] converted_tw_to_grocy_task = converted_tw_to_grocy_tasks[index] current_index_exceeded_tw_list = index > len(converted_tw_to_grocy_tasks) - 1 # Add if no more taskwarrior tasks to process if current_index_exceeded_tw_list: try: tw.task_add(convert_tw_task) log.debug('Added grocy task %s to taskwarrior task', grocy_task['id']) except TaskwarriorError as e: log.exception('Could not add task: %s', % e.stderr) continue # Merge from grocy into taskwarrior if match found # Delete taskwarrior task if self.should_merge(converted_tw_to_grocy_task, grocy_task): try: converted_grocy_task_to_tw = self.to_taskwarrior(grocy_task) tw.update(converted_grocy_task_to_tw) log.debug('Merged grocy task %s to taskwarrior task %s', grocy_task['id'], converted_grocy_task_to_tw['uuid']) del converted_tw_to_grocy_tasks[index] except TaskwarriorError as e: log.exception('Could not update task: %s', % e.stderr) ## Add grocy task into taskwarrior else: try: tw.task_add(convert_tw_task) log.debug('Added grocy task %s to taskwarrior task', grocy_task['id']) except TaskwarriorError as e: log.exception('Could not add task: %s', % e.stderr) # Add any remaining taskwarrior tasks not found in grocy into grocy for index in range(len(convert_tw_task)): tw_task = self.to_taskwarrior(converted_tw_to_grocy_tasks[index]) tw.task_add(tw_task) def sync(): # Get all tasks from taskwarrior # Get all tasks from grocy # Push to taskwarrior taskwarrior_tasks = self.get_grocy_tw_tasks() try: grocy_tasks = self.findAll() except BaseException as e: logger.error(e) logger.error('Could not get all grocy tasks') try: if self.resolution == 'tw': self.merge_from_tw(taskwarrior_tasks, grocy_tasks) else: self.merge_into_tw(taskwarrior_tasks, grocy_task) except BaseException as e: print(e) print('Could not sync')