#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # Copyright (c) 2023 Huawei Device Co., Ltd. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # from __future__ import print_function from __future__ import unicode_literals import sys import importlib from prompt_toolkit.shortcuts import run_application from prompt_toolkit.application import Application from prompt_toolkit.key_binding.manager import KeyBindingManager from prompt_toolkit.keys import Keys from prompt_toolkit.layout.containers import Window from prompt_toolkit.filters import IsDone from prompt_toolkit.layout.controls import TokenListControl from prompt_toolkit.layout.containers import ConditionalContainer from prompt_toolkit.layout.containers import HSplit from prompt_toolkit.layout.dimension import LayoutDimension as D from prompt_toolkit.token import Token from exceptions.ohos_exception import OHOSException from services.interface.menu_interface import MenuInterface from helper.separator import Separator from containers.arg import Arg, ModuleType from resources.config import Config from util.log_util import LogUtil from util.product_util import ProductUtil class Menu(MenuInterface): def select_compile_option(self) -> dict: config = Config() results = {} all_build_args = Arg.parse_all_args(ModuleType.BUILD) for arg in all_build_args.values(): if isinstance(arg, Arg) and arg.arg_attribute.get("optional"): if arg.arg_name != 'target_cpu': choices = [ choice if isinstance(choice, Separator) else { 'name': choice, 'value': arg.arg_name } for choice in arg.arg_attribute.get("optional") ] else: if config.support_cpu is not None and isinstance(config.support_cpu, list): choices = [] for cpu in config.support_cpu: choices.append({ 'name': cpu, 'value': arg.arg_name, }) elif config.target_cpu is not None: choices = [ { 'name': config.target_cpu, 'value': arg.arg_name, } ] else: choices = [ { 'name': all_build_args.get('target_cpu').arg_value, 'value': arg.arg_name, } ] result = self._list_promt(arg.arg_name, 'select {} value'.format( arg.arg_name), choices).get(arg.arg_name)[0] results[arg.arg_name] = result return results def select_product(self) -> dict: product_path_dict = {} company_separator = None os_level = self._select_os_level() for product_info in ProductUtil.get_products(): if product_info['os_level'] is None: raise OHOSException("") if product_info['os_level'] == os_level: company = product_info['company'] product = product_info['name'] if company_separator is None or company_separator != company: company_separator = company product_key = Separator(company_separator) product_path_dict[product_key] = None product_path_dict['{}@{}'.format(product, company)] = product_info if not len(product_path_dict): raise OHOSException('no valid product found') choices = [ product if isinstance(product, Separator) else { 'name': product.split('@')[0], 'value': product.split('@')[1] } for product in product_path_dict.keys() ] product = self._list_promt('product', 'Which product do you need?', choices).get('product') product_key = f'{product[0]}@{product[1]}' return product_path_dict.get(product_key) def _select_os_level(self) -> str: choices = [ { 'name': 'mini', 'value': 'os_level' }, { 'name': 'small', 'value': 'os_level' }, { 'name': 'standard', 'value': 'os_level' } ] return self._list_promt('os_level', 'Which os_level do you need?', choices).get('os_level')[0] def _list_promt(self, name: str, message: str, choices: list, **kwargs): questions = self._get_questions('list', name, message, choices) return self._prompt(questions=questions, **kwargs) def _get_questions(self, promt_type: str, name: str, message: str, choices: list): questions = [{ 'type': promt_type, 'qmark': 'OHOS', 'name': name, 'message': message, 'choices': choices }] return questions def _prompt(self, questions: list, answers=None, **kwargs): if isinstance(questions, dict): questions = [questions] answers = answers or {} patch_stdout = kwargs.pop('patch_stdout', False) return_asyncio_coroutine = kwargs.pop( 'return_asyncio_coroutine', False) true_color = kwargs.pop('true_color', False) refresh_interval = kwargs.pop('refresh_interval', 0) eventloop = kwargs.pop('eventloop', None) kbi_msg = kwargs.pop('keyboard_interrupt_msg', 'Cancelled by user') for question in questions: try: choices = question.get('choices') if choices is not None and callable(choices): question['choices'] = choices(answers) _kwargs = {} _kwargs.update(kwargs) _kwargs.update(question) question_type = _kwargs.pop('type') name = _kwargs.pop('name') message = _kwargs.pop('message') when = _kwargs.pop('when', None) question_filter = _kwargs.pop('filter', None) if when: # at least a little sanity check! if callable(question['when']): try: if not question['when'](answers): continue except Exception as error: raise ValueError( 'Problem in \'when\' check of %s question: %s' % (name, error)) else: raise ValueError('\'when\' needs to be function that ' 'accepts a dict argument') if question_filter: # at least a little sanity check! if not callable(question['filter']): raise ValueError('\'filter\' needs to be function that ' 'accepts an argument') if callable(question.get('default')): _kwargs['default'] = question['default'](answers) application = self._question(message, **_kwargs) answer = run_application( application, patch_stdout=patch_stdout, return_asyncio_coroutine=return_asyncio_coroutine, true_color=true_color, refresh_interval=refresh_interval, eventloop=eventloop) if answer is not None: if question_filter: try: answer = question['filter'](answer) except Exception as error: raise ValueError('Problem processing \'filter\' of' '{} question: {}'.format(name, error)) answers[name] = answer except AttributeError as attr_error: LogUtil.hb_error(attr_error) raise ValueError('No question type \'%s\'' % question_type) except KeyboardInterrupt: LogUtil.hb_warning('') LogUtil.hb_warning(kbi_msg) LogUtil.hb_warning('') return {} return answers def _question(self, message: str, **kwargs): if 'choices' not in kwargs: raise OHOSException("You must choose one platform.") choices = kwargs.pop('choices', None) qmark = kwargs.pop('qmark', '?') style = kwargs.pop('style', _get_style('terminal')) inquirer_control = InquirerControl(choices) def get_prompt_tokens(cli): tokens = [] tokens.append((Token.QuestionMark, qmark)) tokens.append((Token.Question, ' %s ' % message)) if inquirer_control.answered: tokens.append((Token.Answer, ' ' + inquirer_control.get_selection()[0])) else: tokens.append((Token.Instruction, ' (Use arrow keys)')) return tokens # assemble layout layout = HSplit([ Window(height=D.exact(1), content=TokenListControl(get_prompt_tokens)), ConditionalContainer( Window(inquirer_control), filter=~IsDone() ) ]) # key bindings manager = KeyBindingManager.for_prompt() @manager.registry.add_binding(Keys.ControlQ, eager=True) @manager.registry.add_binding(Keys.ControlC, eager=True) def _(event): raise KeyboardInterrupt() @manager.registry.add_binding(Keys.Down, eager=True) def move_cursor_down(event): def _next(): inquirer_control.selected_option_index = ( (inquirer_control.selected_option_index + 1) % inquirer_control.choice_count) _next() while isinstance(inquirer_control.choices[ inquirer_control.selected_option_index][0], Separator) \ or inquirer_control.choices[ inquirer_control.selected_option_index][2]: _next() @manager.registry.add_binding(Keys.Up, eager=True) def move_cursor_up(event): def _prev(): inquirer_control.selected_option_index = ( (inquirer_control.selected_option_index - 1) % inquirer_control.choice_count) _prev() while isinstance(inquirer_control.choices[ inquirer_control.selected_option_index][0], Separator) \ or inquirer_control.choices[ inquirer_control.selected_option_index][2]: _prev() @manager.registry.add_binding(Keys.Enter, eager=True) def set_answer(event): inquirer_control.answered = True event.cli.set_return_value(inquirer_control.get_selection()) return Application( layout=layout, key_bindings_registry=manager.registry, mouse_support=False, style=style ) class InquirerControl(TokenListControl): def __init__(self, choices: list, **kwargs): self.selected_option_index = 0 self.answered = False self.choices = choices self._init_choices(choices) super(InquirerControl, self).__init__(self._get_choice_tokens, **kwargs) @property def choice_count(self): return len(self.choices) def get_selection(self): return self.choices[self.selected_option_index] def _init_choices(self, choices: list, default=None): # helper to convert from question format to internal format self.choices = [] # list (name, value, disabled) searching_first_choice = True for index, choice in enumerate(choices): if isinstance(choice, Separator): self.choices.append((choice, None, None)) else: base_string = str if sys.version_info[0] >= 3 else None if isinstance(choice, base_string): self.choices.append((choice, choice, None)) else: name = choice.get('name') value = choice.get('value', name) disabled = choice.get('disabled', None) self.choices.append((name, value, disabled)) if searching_first_choice: self.selected_option_index = index searching_first_choice = False def _get_choice_tokens(self, cli): tokens = [] token = Token def append(index: int, choice: list): selected = (index == self.selected_option_index) @_if_mousedown def select_item(cli, mouse_event): # bind option with this index to mouse event self.selected_option_index = index self.answered = True tokens.append((token.Pointer if selected else token, ' \u276f ' if selected else ' ')) if selected: tokens.append((Token.SetCursorPosition, '')) if choice[2]: # disabled tokens.append((token.Selected if selected else token, '- %s (%s)' % (choice[0], choice[2]))) else: if isinstance(choice[0], Separator): tokens.append((token.Separator, str(choice[0]), select_item)) else: try: tokens.append((token.Selected if selected else token, str(choice[0]), select_item)) except Exception: tokens.append((token.Selected if selected else token, choice[0], select_item)) tokens.append((token, '\n')) # prepare the select choices for i, choice in enumerate(self.choices): append(i, choice) tokens.pop() # Remove last newline. return tokens def _get_style(style_type: str): style = importlib.import_module('prompt_toolkit.styles') token = importlib.import_module('prompt_toolkit.token') if style_type == 'terminal': return style.style_from_dict({ token.Token.Separator: '#75c951', token.Token.QuestionMark: '#5F819D', token.Token.Selected: '', # default token.Token.Pointer: '#FF9D00 bold', # AWS orange token.Token.Instruction: '', # default token.Token.Answer: '#FF9D00 bold', # AWS orange token.Token.Question: 'bold', }) if style_type == 'answer': return style.style_from_dict({ token.Token.Separator: '#75c951', token.Token.QuestionMark: '#E91E63 bold', token.Token.Selected: '#cc5454', # default token.Token.Pointer: '#ed9164 bold', token.Token.Instruction: '', # default token.Token.Answer: '#f44336 bold', token.Token.Question: '', }) return None def _if_mousedown(handler): def handle_if_mouse_down(cli, mouse_event): mouse_events = importlib.import_module('prompt_toolkit.mouse_events') if mouse_event.event_type == mouse_events.MouseEventTypes.MOUSE_DOWN: return handler(cli, mouse_event) else: return NotImplemented return handle_if_mouse_down