From d1e2636c469f7c58bca900383695317f7c9c2ce4 Mon Sep 17 00:00:00 2001 From: Martin Larralde <martin.larralde@embl.de> Date: Wed, 3 Aug 2022 05:04:00 +0200 Subject: [PATCH] Reorganize `Mapper` code and remove unneeded arguments --- pyfastani/_fastani.pyx | 148 +++++++++++++++++++---------------------- 1 file changed, 68 insertions(+), 80 deletions(-) diff --git a/pyfastani/_fastani.pyx b/pyfastani/_fastani.pyx index 84b3482..456d0c6 100644 --- a/pyfastani/_fastani.pyx +++ b/pyfastani/_fastani.pyx @@ -15,10 +15,11 @@ References: cimport cython cimport cython.parallel cimport libcpp11.chrono +from cpython.buffer cimport Py_buffer, PyBUF_READ from cython.operator cimport dereference, preincrement, postincrement from cpython.ref cimport Py_INCREF from cpython.list cimport PyList_New, PyList_SET_ITEM -from cpython.pycapsule cimport PyCapsule_New, PyCapsule_GetPointer, PyCapsule_CheckExact +from cpython.memoryview cimport PyMemoryView_FromMemory, PyMemoryView_Check, PyMemoryView_GET_BUFFER from libc.stdio cimport printf from libc.string cimport memcpy from libc.limits cimport INT_MAX @@ -821,8 +822,6 @@ cdef class Mapper(_Parameterized): @staticmethod cdef void _do_l1_mappings( - const Parameters_t& param, - const Sketch_t& ref_sketch, Map_t& map, const int kind, const void* data, @@ -843,14 +842,14 @@ cdef class Mapper(_Parameterized): cdef MinimizerMapValueType_t hit_position_list # compute minimizers - if param.alphabetSize == 4: + if map.param.alphabetSize == 4: _add_minimizers_nucl( query.minimizerTableQuery, kind, data, slen, - param.kmerSize, - param.windowSize, + map.param.kmerSize, + map.param.windowSize, 0, ) else: @@ -859,8 +858,8 @@ cdef class Mapper(_Parameterized): kind, data, slen, - param.kmerSize, - param.windowSize, + map.param.kmerSize, + map.param.windowSize, 0, ) @@ -879,15 +878,15 @@ cdef class Mapper(_Parameterized): # keep minimizer if it exist in the reference lookup index it = query.minimizerTableQuery.begin() while it != uniq_end_iter: - seed_find = ref_sketch.minimizerPosLookupIndex.const_find(dereference(it).hash) - if seed_find != ref_sketch.minimizerPosLookupIndex.end(): + seed_find = map.refSketch.minimizerPosLookupIndex.const_find(dereference(it).hash) + if seed_find != map.refSketch.minimizerPosLookupIndex.end(): hit_position_list = dereference(seed_find).second - if hit_position_list.size() < ref_sketch.getFreqThreshold(): + if hit_position_list.size() < map.refSketch.getFreqThreshold(): seed_hits_l1.insert(seed_hits_l1.end(), hit_position_list.begin(), hit_position_list.end()) postincrement(it) # estimate the number of minimum hits, and compute candidates - minimum_hits = estimateMinimumHitsRelaxed(query.sketchSize, param.kmerSize, param.percentageIdentity) + minimum_hits = estimateMinimumHitsRelaxed(query.sketchSize, map.param.kmerSize, map.param.percentageIdentity) map.computeL1CandidateRegions(query, seed_hits_l1, minimum_hits, l1_mappings) cpdef void _query_fragment( @@ -895,17 +894,26 @@ cdef class Mapper(_Parameterized): _Map map, const int i, const seqno_t seq_counter, - const unsigned char[::1] sequence, + object mem, + int kind, + int stride, _FinalMappings final_mappings, ) except *: cdef kseq_t kseq cdef QueryMetaData_t[kseq_ptr_t, Map_t.MinVec_Type] query cdef vector[L1_candidateLocus_t] l1_mappings - cdef const unsigned char* fragment + cdef Py_buffer* buffer + cdef char* data + cdef char* fragment - with nogil: + if __debug__: + if not PyMemoryView_Check(mem): + raise TypeError(f"expected memoryview, got {type(mem).__name__!r}") + buffer = PyMemoryView_GET_BUFFER(mem) - fragment = &sequence[i*self._param.minReadLength] + with nogil: + data = <char*> buffer.buf + fragment = &data[i*self._param.minReadLength*stride] query.kseq = &kseq query.kseq.seq.s = NULL @@ -913,12 +921,9 @@ cdef class Mapper(_Parameterized): query.seqCounter = seq_counter + i Mapper._do_l1_mappings( - # classes with configuration - self._param, - self._sk[0], map._map[0], # start of the i-th fragment - PyUnicode_1BYTE_KIND, + kind, fragment, self._param.minReadLength, # outputs @@ -926,7 +931,11 @@ cdef class Mapper(_Parameterized): l1_mappings, ) - map._map.doL2Mapping(query, l1_mappings, final_mappings._vec) + map._map.doL2Mapping( + query, + l1_mappings, + final_mappings._vec + ) cdef list _query_draft(self, object contigs): """Query the sketcher for the given contigs. @@ -943,11 +952,16 @@ cdef class Mapper(_Parameterized): # bookkeeping cdef uint64_t total_fragments = 0 cdef uint64_t total_length = 0 - # mapping parameters and reference + # compute map and L2 mappings cdef _Map map cdef _FinalMappings final_mappings # sequence as a unicode object + cdef const unsigned char[::1] view + cdef int kind + cdef int stride + cdef void* data cdef ssize_t slen + cdef object mem # core genomic identity results cdef vector[CGI_Results] results cdef CGI_Results result @@ -956,21 +970,14 @@ cdef class Mapper(_Parameterized): cdef uint64_t shared_length cdef list hits = [] - cdef object l2_lock = threading.Lock() - - # create a new mapper with the given mapping result vector + # create a new mapper and a new l2 mapping vector map = _Map.__new__(_Map) map._map = new Map_t(self._param, self._sk[0], total_fragments, 0) - - # create the result vector final_mappings = _FinalMappings.__new__(_FinalMappings) - # + # spawn a thread pool to map fragments in parallel for all the contigs with multiprocessing.pool.ThreadPool() as pool: - - # iterate over contigs for contig in contigs: - # check length of contig is enough for computing mapping slen = len(contig) if slen < min(self._param.windowSize, self._param.kmerSize, self._param.minReadLength): @@ -981,65 +988,46 @@ cdef class Mapper(_Parameterized): ), UserWarning, ) - - # encode + # get a way to read each letter of the contig, + # independently of it being `str`, `bytes`, `bytearray`, etc. if isinstance(contig, str): - contig = contig.encode('ascii') - - # # get a way to read each letter of the contig, - # # independently of it being `str`, `bytes`, `bytearray`, etc. - # if isinstance(contig, str): - # # make sure the unicode string is in canonical form, - # # --> won't be needed anymore in Python 3.12 - # IF SYS_VERSION_INFO_MAJOR <= 3 and SYS_VERSION_INFO_MINOR < 12: - # PyUnicode_READY(contig) - # # get kind and data for efficient indexing - # kind = PyUnicode_KIND(contig) - # data = PyUnicode_DATA(contig) - # slen = PyUnicode_GET_LENGTH(contig) - # if kind == PyUnicode_1BYTE_KIND: - # stride = sizeof(Py_UCS1) - # elif kind == PyUnicode_2BYTE_KIND: - # stride = sizeof(Py_UCS2) - # else: - # stride = sizeof(Py_UCS4) - # else: - # # attempt to view the contig as a buffer of contiguous bytes - # view = contig - # # pretend the bytes are an ASCII (UCS-1) encoded string - # kind = PyUnicode_1BYTE_KIND - # slen = view.shape[0] - # stride = sizeof(Py_UCS1) - # if slen != 0: - # data = <void*> &view[0] - + # make sure the unicode string is in canonical form, + # --> won't be needed anymore in Python 3.12 + IF SYS_VERSION_INFO_MAJOR <= 3 and SYS_VERSION_INFO_MINOR < 12: + PyUnicode_READY(contig) + # get kind and data for efficient indexing + kind = PyUnicode_KIND(contig) + data = PyUnicode_DATA(contig) + slen = PyUnicode_GET_LENGTH(contig) + if kind == PyUnicode_1BYTE_KIND: + stride = sizeof(Py_UCS1) + elif kind == PyUnicode_2BYTE_KIND: + stride = sizeof(Py_UCS2) + else: + stride = sizeof(Py_UCS4) + else: + # attempt to view the contig as a buffer of contiguous bytes + view = contig + # pretend the bytes are an ASCII (UCS-1) encoded string + kind = PyUnicode_1BYTE_KIND + slen = view.shape[0] + stride = sizeof(Py_UCS1) + if slen != 0: + data = <void*> &view[0] + # turn the void pointer into a Python object so it can + # be passed to another Python method + mem = PyMemoryView_FromMemory(<char*> data, slen*stride, PyBUF_READ) # compute the expected number of blocks fragment_count = slen // self._param.minReadLength - - # map the blocks - # for i in range(fragment_count): - # self._query_fragment( - # map, - # # position in query - # i, - # total_fragments, - # # sequence - # contig, - # # result vector - # final_mappings - # ) + # map the blocks in parallel pool.map( - lambda i: self._query_fragment(map, i, total_fragments, contig, final_mappings), + lambda i: self._query_fragment(map, i, total_fragments, mem, kind, stride, final_mappings), range(fragment_count), ) - # record the number of fragments total_fragments += fragment_count total_length += slen - # - # printf("FINALL MAPPINGS: %i\n", final_mappings._vec.size()) - # compute core genomic identity after successful mapping with nogil: computeCGI( -- GitLab