# cython: language_level=3 # cython: boundscheck=False # cython: wraparound=False # cython: cdivision=True from libc.stdint cimport uint32_t from libc.stddef cimport size_t from cpython.bytes cimport PyBytes_AsString, PyBytes_GET_SIZE cimport cython from ._routing_pcre2 cimport * from typing import Optional # Type aliases for cleaner NULL casts ctypedef pcre2_compile_context* compile_ctx_ptr ctypedef pcre2_match_context* match_ctx_ptr ctypedef pcre2_general_context* general_ctx_ptr # Buffer size for error messages DEF ERROR_BUFFER_SIZE = 256 # Maximum capture groups we support DEF MAX_CAPTURE_GROUPS = 32 cdef class PCRE2Pattern: cdef: pcre2_code* _code pcre2_match_data* _match_data bint _jit_available str _pattern_str uint32_t _capture_count dict _name_to_index # Named capture groups list _index_to_name # Index to name mapping def __cinit__(self): self._code = NULL self._match_data = NULL self._jit_available = False self._capture_count = 0 self._name_to_index = {} self._index_to_name = [] def __dealloc__(self): if self._match_data is not NULL: pcre2_match_data_free(self._match_data) self._match_data = NULL if self._code is not NULL: pcre2_code_free(self._code) self._code = NULL @staticmethod cdef PCRE2Pattern _create(str pattern, bint case_insensitive=False, bint use_jit=True): cdef: PCRE2Pattern self = PCRE2Pattern.__new__(PCRE2Pattern) bytes pattern_bytes const char* pattern_ptr Py_ssize_t pattern_len uint32_t options = 0 int errorcode = 0 PCRE2_SIZE erroroffset = 0 int jit_result uint32_t capture_count = 0 self._pattern_str = pattern self._name_to_index = {} self._index_to_name = [] pattern_bytes = pattern.encode('utf-8') pattern_ptr = PyBytes_AsString(pattern_bytes) pattern_len = PyBytes_GET_SIZE(pattern_bytes) options = PCRE2_UTF | PCRE2_UCP if case_insensitive: options |= PCRE2_CASELESS self._code = pcre2_compile( pattern_ptr, pattern_len, options, &errorcode, &erroroffset, NULL ) if self._code is NULL: error_msg = PCRE2Pattern._get_error_message(errorcode) raise ValueError(f"PCRE2 compile error at offset {erroroffset}: {error_msg}") if use_jit: jit_result = pcre2_jit_compile(self._code, PCRE2_JIT_COMPLETE) self._jit_available = (jit_result == 0) pcre2_pattern_info(self._code, PCRE2_INFO_CAPTURECOUNT, &capture_count) self._capture_count = capture_count self._match_data = pcre2_match_data_create_from_pattern(self._code, NULL) if self._match_data is NULL: pcre2_code_free(self._code) self._code = NULL raise MemoryError("Failed to create match data") self._extract_named_groups() return self cdef void _extract_named_groups(self): cdef: uint32_t namecount = 0 uint32_t nameentrysize = 0 PCRE2_SPTR nametable uint32_t i int group_num bytes name_bytes str name pcre2_pattern_info(self._code, PCRE2_INFO_NAMECOUNT, &namecount) if namecount == 0: return # void return pcre2_pattern_info(self._code, PCRE2_INFO_NAMEENTRYSIZE, &nameentrysize) pcre2_pattern_info(self._code, PCRE2_INFO_NAMETABLE, &nametable) self._index_to_name = [None] * (self._capture_count + 1) for i in range(namecount): group_num = (nametable[0] << 8) | nametable[1] name_bytes = (nametable + 2) name = name_bytes.decode('utf-8') self._name_to_index[name] = group_num if group_num <= self._capture_count: self._index_to_name[group_num] = name nametable += nameentrysize @staticmethod cdef str _get_error_message(int errorcode): cdef: PCRE2_UCHAR buffer[ERROR_BUFFER_SIZE] int result result = pcre2_get_error_message(errorcode, buffer, ERROR_BUFFER_SIZE) if result < 0: return f"Unknown error {errorcode}" return (buffer).decode('utf-8') cpdef bint search(self, str subject): """ Search for pattern anywhere in subject. Returns True if found, False otherwise. """ cdef: bytes subject_bytes const char* subject_ptr Py_ssize_t subject_len int result if self._code is NULL: return False subject_bytes = subject.encode('utf-8') subject_ptr = PyBytes_AsString(subject_bytes) subject_len = PyBytes_GET_SIZE(subject_bytes) if self._jit_available: result = pcre2_jit_match( self._code, subject_ptr, subject_len, 0, # start offset 0, # options self._match_data, NULL ) else: result = pcre2_match( self._code, subject_ptr, subject_len, 0, 0, self._match_data, NULL ) return (result >= 0) cpdef dict groupdict(self, str subject): """ Match pattern and return dict of named groups. Returns empty dict if no match or no named groups. """ cdef: bytes subject_bytes const char* subject_ptr Py_ssize_t subject_len int result PCRE2_SIZE* ovector dict groups = {} str name int index PCRE2_SIZE start, end if self._code is NULL or not self._name_to_index: return groups subject_bytes = subject.encode('utf-8') subject_ptr = PyBytes_AsString(subject_bytes) subject_len = PyBytes_GET_SIZE(subject_bytes) if self._jit_available: result = pcre2_jit_match( self._code, subject_ptr, subject_len, 0, 0, self._match_data, NULL ) else: result = pcre2_match( self._code, subject_ptr, subject_len, 0, 0, self._match_data, NULL ) if result < 0: return groups ovector = pcre2_get_ovector_pointer(self._match_data) for name, index in self._name_to_index.items(): start = ovector[(2 * index)] end = ovector[(2 * index + 1)] if start != PCRE2_UNSET and end != PCRE2_UNSET: groups[name] = subject_bytes[start:end].decode('utf-8') else: groups[name] = None return groups cpdef tuple search_with_groups(self, str subject): cdef: bytes subject_bytes const char* subject_ptr Py_ssize_t subject_len int result PCRE2_SIZE* ovector dict groups = {} str name int index PCRE2_SIZE start, end if self._code is NULL: return (False, {}) subject_bytes = subject.encode('utf-8') subject_ptr = PyBytes_AsString(subject_bytes) subject_len = PyBytes_GET_SIZE(subject_bytes) if self._jit_available: result = pcre2_jit_match( self._code, subject_ptr, subject_len, 0, 0, self._match_data, NULL ) else: result = pcre2_match( self._code, subject_ptr, subject_len, 0, 0, self._match_data, NULL ) if result < 0: return (False, {}) if self._name_to_index: ovector = pcre2_get_ovector_pointer(self._match_data) for name, index in self._name_to_index.items(): start = ovector[(2 * index)] end = ovector[(2 * index + 1)] if start != PCRE2_UNSET and end != PCRE2_UNSET: groups[name] = subject_bytes[start:end].decode('utf-8') else: groups[name] = None return (True, groups) @property def pattern(self) -> str: return self._pattern_str @property def jit_compiled(self) -> bool: return self._jit_available @property def capture_count(self) -> int: return self._capture_count cdef class FastRouteMatch: cdef: public dict config public dict params def __cinit__(self): self.config = {} self.params = {} def __init__(self, dict config, params=None): self.config = config self.params = params if params is not None else {} cdef class FastRouter: """ High-performance router with PCRE2 JIT-compiled patterns. Matching order (nginx-like): 1. Exact routes (prefix "=") - O(1) dict lookup 2. Regex routes (prefix "~" or "~*") - PCRE2 JIT matching 3. Default route (fallback) """ cdef: dict _exact_routes list _regex_routes dict _default_route bint _has_default int _regex_count def __cinit__(self): self._exact_routes = {} self._regex_routes = [] self._default_route = {} self._has_default = False self._regex_count = 0 def __init__(self): self._exact_routes = {} self._regex_routes = [] self._default_route = {} self._has_default = False self._regex_count = 0 def add_route(self, str pattern, dict config): cdef: str exact_path str regex_pattern bint case_insensitive PCRE2Pattern compiled_pattern if pattern.startswith("="): exact_path = pattern[1:] self._exact_routes[exact_path] = config elif pattern == "__default__": self._default_route = config self._has_default = True elif pattern.startswith("~"): case_insensitive = pattern.startswith("~*") regex_pattern = pattern[2:] if case_insensitive else pattern[1:] try: compiled_pattern = PCRE2Pattern._create(regex_pattern, case_insensitive) self._regex_routes.append((compiled_pattern, config)) self._regex_count = len(self._regex_routes) except (ValueError, MemoryError): pass # Skip invalid patterns cpdef object match(self, str path): cdef: dict config dict params int i PCRE2Pattern pattern tuple route_entry bint matched if path in self._exact_routes: config = self._exact_routes[path] return FastRouteMatch(config, {}) for i in range(self._regex_count): route_entry = self._regex_routes[i] pattern = route_entry[0] config = route_entry[1] matched, params = pattern.search_with_groups(path) if matched: return FastRouteMatch(config, params) if self._has_default: return FastRouteMatch(self._default_route, {}) return None @property def exact_routes(self) -> dict: return self._exact_routes @property def routes(self) -> dict: """Return regex routes as dict (pattern_str -> config).""" cdef: dict result = {} PCRE2Pattern pattern for pattern, config in self._regex_routes: result[pattern.pattern] = config return result @property def default_route(self) -> Optional[dict]: return self._default_route if self._has_default else None cpdef list list_routes(self): cdef: list result = [] str path_str dict config PCRE2Pattern pattern for path_str, config in self._exact_routes.items(): result.append({ "type": "exact", "pattern": f"={path_str}", "config": config, }) for pattern, config in self._regex_routes: result.append({ "type": "regex", "pattern": pattern.pattern, "jit_compiled": pattern.jit_compiled, "config": config, }) if self._has_default: result.append({ "type": "default", "pattern": "__default__", "config": self._default_route, }) return result def compile_pattern(str pattern, bint case_insensitive=False) -> PCRE2Pattern: """ Compile a PCRE2 pattern with JIT support. Args: pattern: Regular expression pattern case_insensitive: Whether to match case-insensitively Returns: Compiled PCRE2Pattern object """ return PCRE2Pattern._create(pattern, case_insensitive) def fast_match(router: FastRouter, str path): """ Convenience function for matching a path. Args: router: FastRouter instance path: URL path to match Returns: FastRouteMatch or None """ return router.match(path)