def convert_rate_to_percent(rate: float) -> str: if rate == 1.0: return '+0%' percent = round((rate - 1.0) * 100) if percent > 0: return f'+{percent}%' else: return f'{percent}%'async def _install(self, package: str, *, upgrade: bool) -> bool: if upgrade: return self.run(['rye', 'sync', '--update', *split_packages(package)]) return self.run(['rye', 'add', *split_packages(package)])def flow_warp(feature, flow, mask=False, padding_mode='zeros'): (b, c, h, w) = feature.size() assert flow.size(1) == 2 grid = coords_grid(b, h, w).to(flow.device) + flow return bilinear_sample(feature, grid, padding_mode=padding_mode, return_mask=mask)def test_resolve_array_noop(executing_kernel: Kernel) -> None: del executing_kernel array = ui.array([ui.text(), ui.slider(1, 10)]) registry = get_context().ui_element_registry assert registry.resolve_lens(array._id, {'0': 'hello world'}) == (array._id, {'0': 'hello world'})def _set_initial_state(self, state: State | None, max_iterations: int=MAX_ITERATIONS): if state is None: self.state = State(inputs={}, max_iterations=max_iterations) else: self.state = statedef draw_text_det_res(dt_boxes, img_path): src_im = cv2.imread(img_path) for box in dt_boxes: box = np.array(box).astype(np.int32).reshape(-1, 2) cv2.polylines(src_im, [box], True, color=(255, 255, 0), thickness=2) return src_imdef __init__(self, connection: Catalog, engine_name: Optional[VariableName]=None) -> None: super().__init__(connection, engine_name) self.default_database = self.get_default_database() self.default_schema = self.get_default_schema()def _check_edges(error: Error, expected_edges: Sequence[EdgeWithVar]) -> None: assert isinstance(error, CycleError) assert len(error.edges_with_vars) == len(expected_edges) for edge in expected_edges: assert edge in error.edges_with_varsdef easy_data(csv): from opensora.registry import DATASETS, build_module dataset = build_module({'type': 'VariableVideoTextDataset', 'transform_name': 'resize_crop', 'data_path': csv}, DATASETS) return dataset['0-113-360-640']async def monitor_idle_memory(duration_seconds: int=60, interval_seconds: int=1): end_time = time.time() + duration_seconds while time.time() < end_time: log_current_memory() await asyncio.sleep(interval_seconds)def test_extract_model_and_provider_dot_format(self): model = 'anthropic.claude-3-7' result = extract_model_and_provider(model) assert result['provider'] == 'anthropic' assert result['model'] == 'claude-3-7' assert result['separator'] == '.'def test_read_session_view_no_path(self): view = SessionView() manager = SessionCacheManager(view, None, 0.1) assert manager.read_session_view(SessionCacheKey(codes=tuple(), marimo_version='-1')) == view