#! /usr/bin/env python # -*- coding: utf-8 -*- # vim:fenc=utf-8 # # Copyright © 2017 lafrite # # Distributed under terms of the MIT license. """ Converting a string with coroutines """ from functools import partial from .coroutine import * __all__ = ["str2", ] def maybe_it_is(cara): """ Return a function which return Maybe if cara startwith the argument and True if it is cara :exemple: >>> it_is_pipo = maybe_it_is("pipo") >>> it_is_pipo("pi") 'maybe' >>> it_is_pipo("pipo") True >>> it_is_pipo("uo") False >>> it_is_iuo = maybe_it_is("iuo") >>> it_is_iuo("pi") False >>> it_is_iuo("iuo") True >>> it_is_iuo("uo") False """ def func(c): if c == cara: return True elif cara.startswith(c): return "maybe" else: return False return func def something_in(cara_list): """ Return a function which sais whether a caracter is in cara_list or not """ def func(c): if c in cara_list: return True else: return False return func @coroutine def lookfor(condition, replace = lambda x:''.join(x)): """ Sink which is looking for term and yield replace founded patern with the lenght of the accumulation. It can be reset by sending None :param condition: conditional function which returns a boolean or "maybe" :param replace: function which make a final transformation to the founded string. Id by default. :example: >>> lf = lookfor(maybe_it_is("pipo"), lambda x: 'remp') >>> acc = "" >>> for i in 'popipo': ... a = lf.send(i) ... if a == "maybe": ... acc += i ... elif a: ... acc = "" ... print(a) ... else: ... print(acc + i) ... acc = "" po remp >>> lf = lookfor(maybe_it_is("po") , lambda x: ''.join(x).upper()) >>> acc = "" >>> for i in 'popipo': ... a = lf.send(i) ... if a == "maybe": ... acc += i ... elif a: ... acc = "" ... print(a) ... else: ... print(acc + i) ... acc = "" PO pi PO >>> lf = lookfor(maybe_it_is("pi") , lambda x: 1) >>> acc = "" >>> for i in 'popipop': ... a = lf.send(i) ... if a == "maybe": ... acc += i ... elif a: ... acc = "" ... print(a) ... else: ... print(acc + i) ... acc = "" po 1 po >>> # the p in still processing >>> acc = "" >>> for i in 'iop': ... a = lf.send(i) ... if a == "maybe": ... acc += i ... elif a: ... acc = "" ... print(a) ... else: ... print(acc + i) ... acc = "" 1 o >>> # the p in still processing >>> lf.throw(RESTAAART) False >>> # p has been forgot >>> acc = "" >>> for i in 'ipopi': ... a = lf.send(i) ... if a == "maybe": ... acc += i ... elif a: ... acc = "" ... print(a) ... else: ... print(acc + i) ... acc = "" i po 1 """ acc = [] ans = False while True: try: c = (yield ans) except RESTAAART as err: acc = [] ans = False else: if c is not None: acc.append(c) found = condition(''.join(acc)) if found == "maybe": ans = "maybe" elif found: ans = replace(acc) acc = [] else: ans = False acc = [] @coroutine def remember_lookfor(lookfor): """ Coroutine which remember sent value before the lookfor finds something :example: >>> lkf = lookfor(maybe_it_is("pipo"), lambda x: 'remp') >>> rmb_lkf = remember_lookfor(lkf) >>> for i in 'popipopi': ... print(rmb_lkf.send(i)) maybe False maybe maybe maybe ['p', 'o', 'remp'] maybe maybe >>> for i in 'popi': ... print(rmb_lkf.send(i)) maybe ['remp'] maybe maybe >>> rmb_lkf.throw(RESTAAART) ['p', 'i'] >>> for i in 'popi': ... print(rmb_lkf.send(i)) maybe False maybe maybe """ acc = [] mb_acc = [] ans = False while True: try: c = (yield ans) except RESTAAART as err: lookfor.throw(err) ans = acc + mb_acc acc = [] mb_acc = [] else: lkf_state = lookfor.send(c) if lkf_state == "maybe": mb_acc.append(c) ans = "maybe" elif lkf_state: ans = acc + [lkf_state] acc = [] mb_acc = [] else: acc += mb_acc mb_acc = [] acc.append(c) ans = False @coroutine def concurent_broadcast(target, lookfors = []): """ Coroutine which broadcasts multiple lookfor coroutines and reinitializes them when one found something Same as parallelized_lookfor but coroutine version :param target: target to send data to :param lookfors: list of lookfor coroutines :example: >>> lf1 = lookfor(maybe_it_is("abc"), lambda x: "".join(x).upper()) >>> searcher = concurent_broadcast(list_sink, [lf1]) >>> for i in "azabcab": ... searcher.send(i) >>> a = searcher.throw(STOOOP) >>> print(a) ['a', 'z', 'ABC', 'a', 'b'] >>> lf2 = lookfor(maybe_it_is("az")) >>> searcher = concurent_broadcast(list_sink, [lf1, lf2]) >>> for i in "azabcabazb": ... searcher.send(i) >>> a = searcher.throw(STOOOP) >>> print(a) ['az', 'ABC', 'a', 'b', 'az', 'b'] >>> lfop = lookfor(something_in("+-*/()"), lambda x: f"op{x}") >>> searcher = concurent_broadcast(list_sink, [lfop]) >>> for i in '12+3+234': ... searcher.send(i) >>> a = searcher.throw(STOOOP) >>> print(a) ['1', '2', "op['+']", '3', "op['+']", '2', '3', '4'] >>> # need to remake a searcher otherwise it remenbers old ans >>> searcher = concurent_broadcast(list_sink, [lfop]) >>> for i in '12*(3+234)': ... searcher.send(i) >>> a = searcher.throw(STOOOP) >>> print(a) ['1', '2', "op['*']", "op['(']", '3', "op['+']", '2', '3', '4', "op[')']"] >>> lfsqrt = lookfor(maybe_it_is("sqrt"), lambda x: f"op['sqrt']") >>> searcher = concurent_broadcast(list_sink, [lfop, lfsqrt]) >>> for i in '3+2*sqrt(3)': ... searcher.send(i) >>> a = searcher.throw(STOOOP) >>> print(a) ['3', "op['+']", '2', "op['*']", "op['sqrt']", "op['(']", '3', "op[')']"] """ try: target_ = target() except TypeError: target_ = target lookfors_ = [remember_lookfor(lkf) for lkf in lookfors] stock = [] ans = [] try: while True: found = False tok = yield if tok is not None: for lf in lookfors_: lf_ans = lf.send(tok) if lf_ans and lf_ans != "maybe": found = lf_ans break if found: for lf in lookfors_: lf.throw(RESTAAART) for i in found: ans = target_.send(i) except STOOOP as err: for lf in lookfors_: last = lf.throw(RESTAAART) for i in last: target_.send(i) yield target_.throw(err) @coroutine def lookforNumbers(target): """ Coroutine which parse numbers :exemple: >>> str2list = lookforNumbers(list_sink) >>> for i in "12+1234*67": ... str2list.send(i) >>> a = str2list.throw(STOOOP) >>> print(a) ['12', '+', '1234', '*', '67'] >>> str2list = lookforNumbers(list_sink) >>> for i in "1.2+12.34*67": ... str2list.send(i) >>> a = str2list.throw(STOOOP) >>> print(a) ['1.2', '+', '12.34', '*', '67'] >>> str2list = lookforNumbers(list_sink) >>> for i in "12.3.4*67": ... str2list.send(i) Traceback (most recent call last): ... ValueError: Can't build a number with 2 dots (current is 12.3) >>> str2list = lookforNumbers(list_sink) >>> for i in ".34*67": ... a = str2list.send(i) Traceback (most recent call last): ... ValueError: Can't build a number starting with a dot """ try: target_ = target() except TypeError: target_ = target current = "" ans = [] try: while True: tok = yield if tok is not None: try: int(tok) except ValueError: if tok == '.': if "." in current: raise ValueError(f"Can't build a number with 2 dots (current is {current})") elif len(current) == 0: raise ValueError(f"Can't build a number starting with a dot") else: current += tok else: if current: target_.send(current) current = "" target_.send(tok) else: current += tok except STOOOP as err: if current: target_.send(current) yield target_.throw(err) @coroutine def pparser(target): """ Parenthesis parser sink :example: >>> pp = pparser(list_sink) >>> for i in "buio": ... pp.send(i) >>> a = pp.throw(STOOOP) >>> a ['b', 'u', 'i', 'o'] >>> pp = pparser(list_sink) >>> for i in "agg(bcz)e(i(o))": ... pp.send(i) >>> a = pp.throw(STOOOP) >>> a ['a', 'g', 'g', ['b', 'c', 'z'], 'e', ['i', ['o']]] """ target_ = target() try: while True: caract = yield if caract == "(": a = yield from pparser(target) target_.send(a) elif caract == ")": a = target_.throw(STOOOP) return a else: target_.send(caract) except STOOOP as err: yield target_.throw(err) @coroutine def list_sink(): """ Testing sink for coroutines :example: >>> sink = list_sink() >>> for i in '12+34 * 3': ... sink.send(i) >>> a = sink.throw(STOOOP) >>> a ['1', '2', '+', '3', '4', ' ', '*', ' ', '3'] """ ans = list() try: while True: c = (yield) if c is not None: ans.append(c) except STOOOP: yield ans def str2(sink): """ Return a pipeline which parse an expression with the sink as an endpont :example: >>> str2nestedlist = str2(list_sink) >>> exp = "12+3*4" >>> t = str2nestedlist(exp) >>> print(t) ['12', '+', '3', '*', '4'] >>> exp = "12*3+4" >>> t = str2nestedlist(exp) >>> print(t) ['12', '*', '3', '+', '4'] >>> exp = "12*(3+4)" >>> t = str2nestedlist(exp) >>> print(t) ['12', '*', ['3', '+', '4']] >>> from .tree import MutableTree >>> str2tree = str2(MutableTree.sink) >>> exp = "12+3*4" >>> t = str2tree(exp) >>> print(t) + > 12 > * | > 3 | > 4 >>> exp = "12*3+4" >>> t = str2tree(exp) >>> print(t) + > * | > 12 | > 3 > 4 >>> exp = "12*(3+4)" >>> t = str2tree(exp) >>> print(t) * > 12 > + | > 3 | > 4 """ lfop = lookfor(something_in("+-*/")) operator_corout = partial(concurent_broadcast, lookfors = [lfop]) def pipeline(expression): str2_corout = lookforNumbers(operator_corout(pparser(sink))) for i in expression: str2_corout.send(i) a = str2_corout.throw(STOOOP) return a return pipeline str2nestedlist = str2(list_sink) # ----------------------------- # Reglages pour 'vim' # vim:set autoindent expandtab tabstop=4 shiftwidth=4: # cursor: 16 del