508 lines
12 KiB
Python
508 lines
12 KiB
Python
|
#! /usr/bin/env python
|
||
|
# -*- coding: utf-8 -*-
|
||
|
# vim:fenc=utf-8
|
||
|
#
|
||
|
# Copyright © 2017 lafrite <lafrite@Poivre>
|
||
|
#
|
||
|
# Distributed under terms of the MIT license.
|
||
|
|
||
|
"""
|
||
|
Converting a string with coroutines
|
||
|
"""
|
||
|
|
||
|
from functools import partial
|
||
|
from .coroutine import *
|
||
|
|
||
|
__all__ = ["str2", ]
|
||
|
|
||
|
def maybe_it_is(cara):
|
||
|
""" Return a function which return
|
||
|
|
||
|
Maybe if cara startwith the argument and True if it is cara
|
||
|
|
||
|
:exemple:
|
||
|
|
||
|
>>> it_is_pipo = maybe_it_is("pipo")
|
||
|
>>> it_is_pipo("pi")
|
||
|
'maybe'
|
||
|
>>> it_is_pipo("pipo")
|
||
|
True
|
||
|
>>> it_is_pipo("uo")
|
||
|
False
|
||
|
>>> it_is_iuo = maybe_it_is("iuo")
|
||
|
>>> it_is_iuo("pi")
|
||
|
False
|
||
|
>>> it_is_iuo("iuo")
|
||
|
True
|
||
|
>>> it_is_iuo("uo")
|
||
|
False
|
||
|
"""
|
||
|
def func(c):
|
||
|
if c == cara:
|
||
|
return True
|
||
|
elif cara.startswith(c):
|
||
|
return "maybe"
|
||
|
else:
|
||
|
return False
|
||
|
return func
|
||
|
|
||
|
def something_in(cara_list):
|
||
|
""" Return a function which sais whether a caracter is in cara_list or not
|
||
|
"""
|
||
|
def func(c):
|
||
|
if c in cara_list:
|
||
|
return True
|
||
|
else:
|
||
|
return False
|
||
|
return func
|
||
|
|
||
|
@coroutine
|
||
|
def lookfor(condition, replace = lambda x:''.join(x)):
|
||
|
""" Sink which is looking for term and yield replace founded patern
|
||
|
with the lenght of the accumulation.
|
||
|
|
||
|
It can be reset by sending None
|
||
|
|
||
|
:param condition: conditional function which returns a boolean or "maybe"
|
||
|
:param replace: function which make a final transformation to the founded
|
||
|
string. Id by default.
|
||
|
|
||
|
:example:
|
||
|
|
||
|
>>> lf = lookfor(maybe_it_is("pipo"), lambda x: 'remp')
|
||
|
>>> acc = ""
|
||
|
>>> for i in 'popipo':
|
||
|
... a = lf.send(i)
|
||
|
... if a == "maybe":
|
||
|
... acc += i
|
||
|
... elif a:
|
||
|
... acc = ""
|
||
|
... print(a)
|
||
|
... else:
|
||
|
... print(acc + i)
|
||
|
... acc = ""
|
||
|
po
|
||
|
remp
|
||
|
>>> lf = lookfor(maybe_it_is("po") , lambda x: ''.join(x).upper())
|
||
|
>>> acc = ""
|
||
|
>>> for i in 'popipo':
|
||
|
... a = lf.send(i)
|
||
|
... if a == "maybe":
|
||
|
... acc += i
|
||
|
... elif a:
|
||
|
... acc = ""
|
||
|
... print(a)
|
||
|
... else:
|
||
|
... print(acc + i)
|
||
|
... acc = ""
|
||
|
PO
|
||
|
pi
|
||
|
PO
|
||
|
>>> lf = lookfor(maybe_it_is("pi") , lambda x: 1)
|
||
|
>>> acc = ""
|
||
|
>>> for i in 'popipop':
|
||
|
... a = lf.send(i)
|
||
|
... if a == "maybe":
|
||
|
... acc += i
|
||
|
... elif a:
|
||
|
... acc = ""
|
||
|
... print(a)
|
||
|
... else:
|
||
|
... print(acc + i)
|
||
|
... acc = ""
|
||
|
po
|
||
|
1
|
||
|
po
|
||
|
>>> # the p in still processing
|
||
|
>>> acc = ""
|
||
|
>>> for i in 'iop':
|
||
|
... a = lf.send(i)
|
||
|
... if a == "maybe":
|
||
|
... acc += i
|
||
|
... elif a:
|
||
|
... acc = ""
|
||
|
... print(a)
|
||
|
... else:
|
||
|
... print(acc + i)
|
||
|
... acc = ""
|
||
|
1
|
||
|
o
|
||
|
>>> # the p in still processing
|
||
|
>>> lf.throw(RESTAAART)
|
||
|
False
|
||
|
>>> # p has been forgot
|
||
|
>>> acc = ""
|
||
|
>>> for i in 'ipopi':
|
||
|
... a = lf.send(i)
|
||
|
... if a == "maybe":
|
||
|
... acc += i
|
||
|
... elif a:
|
||
|
... acc = ""
|
||
|
... print(a)
|
||
|
... else:
|
||
|
... print(acc + i)
|
||
|
... acc = ""
|
||
|
i
|
||
|
po
|
||
|
1
|
||
|
|
||
|
"""
|
||
|
acc = []
|
||
|
ans = False
|
||
|
while True:
|
||
|
try:
|
||
|
c = (yield ans)
|
||
|
except RESTAAART as err:
|
||
|
acc = []
|
||
|
ans = False
|
||
|
else:
|
||
|
if c is not None:
|
||
|
acc.append(c)
|
||
|
found = condition(''.join(acc))
|
||
|
if found == "maybe":
|
||
|
ans = "maybe"
|
||
|
elif found:
|
||
|
ans = replace(acc)
|
||
|
acc = []
|
||
|
else:
|
||
|
ans = False
|
||
|
acc = []
|
||
|
|
||
|
@coroutine
|
||
|
def remember_lookfor(lookfor):
|
||
|
""" Coroutine which remember sent value before the lookfor finds something
|
||
|
|
||
|
:example:
|
||
|
|
||
|
>>> lkf = lookfor(maybe_it_is("pipo"), lambda x: 'remp')
|
||
|
>>> rmb_lkf = remember_lookfor(lkf)
|
||
|
>>> for i in 'popipopi':
|
||
|
... print(rmb_lkf.send(i))
|
||
|
maybe
|
||
|
False
|
||
|
maybe
|
||
|
maybe
|
||
|
maybe
|
||
|
['p', 'o', 'remp']
|
||
|
maybe
|
||
|
maybe
|
||
|
>>> for i in 'popi':
|
||
|
... print(rmb_lkf.send(i))
|
||
|
maybe
|
||
|
['remp']
|
||
|
maybe
|
||
|
maybe
|
||
|
>>> rmb_lkf.throw(RESTAAART)
|
||
|
['p', 'i']
|
||
|
>>> for i in 'popi':
|
||
|
... print(rmb_lkf.send(i))
|
||
|
maybe
|
||
|
False
|
||
|
maybe
|
||
|
maybe
|
||
|
|
||
|
"""
|
||
|
acc = []
|
||
|
mb_acc = []
|
||
|
ans = False
|
||
|
while True:
|
||
|
try:
|
||
|
c = (yield ans)
|
||
|
except RESTAAART as err:
|
||
|
lookfor.throw(err)
|
||
|
ans = acc + mb_acc
|
||
|
acc = []
|
||
|
mb_acc = []
|
||
|
else:
|
||
|
lkf_state = lookfor.send(c)
|
||
|
if lkf_state == "maybe":
|
||
|
mb_acc.append(c)
|
||
|
ans = "maybe"
|
||
|
elif lkf_state:
|
||
|
ans = acc + [lkf_state]
|
||
|
acc = []
|
||
|
mb_acc = []
|
||
|
else:
|
||
|
acc += mb_acc
|
||
|
mb_acc = []
|
||
|
acc.append(c)
|
||
|
ans = False
|
||
|
|
||
|
@coroutine
|
||
|
def concurent_broadcast(target, lookfors = []):
|
||
|
""" Coroutine which broadcasts multiple lookfor coroutines and reinitializes
|
||
|
them when one found something
|
||
|
|
||
|
Same as parallelized_lookfor but coroutine version
|
||
|
|
||
|
:param target: target to send data to
|
||
|
:param lookfors: list of lookfor coroutines
|
||
|
|
||
|
:example:
|
||
|
>>> lf1 = lookfor(maybe_it_is("abc"), lambda x: "".join(x).upper())
|
||
|
>>> searcher = concurent_broadcast(list_sink, [lf1])
|
||
|
>>> for i in "azabcab":
|
||
|
... searcher.send(i)
|
||
|
>>> a = searcher.throw(STOOOP)
|
||
|
>>> print(a)
|
||
|
['a', 'z', 'ABC', 'a', 'b']
|
||
|
|
||
|
>>> lf2 = lookfor(maybe_it_is("az"))
|
||
|
>>> searcher = concurent_broadcast(list_sink, [lf1, lf2])
|
||
|
>>> for i in "azabcabazb":
|
||
|
... searcher.send(i)
|
||
|
>>> a = searcher.throw(STOOOP)
|
||
|
>>> print(a)
|
||
|
['az', 'ABC', 'a', 'b', 'az', 'b']
|
||
|
|
||
|
>>> lfop = lookfor(something_in("+-*/()"), lambda x: f"op{x}")
|
||
|
>>> searcher = concurent_broadcast(list_sink, [lfop])
|
||
|
>>> for i in '12+3+234':
|
||
|
... searcher.send(i)
|
||
|
>>> a = searcher.throw(STOOOP)
|
||
|
>>> print(a)
|
||
|
['1', '2', "op['+']", '3', "op['+']", '2', '3', '4']
|
||
|
|
||
|
>>> # need to remake a searcher otherwise it remenbers old ans
|
||
|
>>> searcher = concurent_broadcast(list_sink, [lfop])
|
||
|
>>> for i in '12*(3+234)':
|
||
|
... searcher.send(i)
|
||
|
>>> a = searcher.throw(STOOOP)
|
||
|
>>> print(a)
|
||
|
['1', '2', "op['*']", "op['(']", '3', "op['+']", '2', '3', '4', "op[')']"]
|
||
|
|
||
|
>>> lfsqrt = lookfor(maybe_it_is("sqrt"), lambda x: f"op['sqrt']")
|
||
|
>>> searcher = concurent_broadcast(list_sink, [lfop, lfsqrt])
|
||
|
>>> for i in '3+2*sqrt(3)':
|
||
|
... searcher.send(i)
|
||
|
>>> a = searcher.throw(STOOOP)
|
||
|
>>> print(a)
|
||
|
['3', "op['+']", '2', "op['*']", "op['sqrt']", "op['(']", '3', "op[')']"]
|
||
|
"""
|
||
|
try:
|
||
|
target_ = target()
|
||
|
except TypeError:
|
||
|
target_ = target
|
||
|
|
||
|
lookfors_ = [remember_lookfor(lkf) for lkf in lookfors]
|
||
|
|
||
|
stock = []
|
||
|
ans = []
|
||
|
try:
|
||
|
while True:
|
||
|
found = False
|
||
|
tok = yield
|
||
|
if tok is not None:
|
||
|
for lf in lookfors_:
|
||
|
lf_ans = lf.send(tok)
|
||
|
if lf_ans and lf_ans != "maybe":
|
||
|
found = lf_ans
|
||
|
break
|
||
|
|
||
|
if found:
|
||
|
for lf in lookfors_:
|
||
|
lf.throw(RESTAAART)
|
||
|
for i in found:
|
||
|
ans = target_.send(i)
|
||
|
except STOOOP as err:
|
||
|
for lf in lookfors_:
|
||
|
last = lf.throw(RESTAAART)
|
||
|
for i in last:
|
||
|
target_.send(i)
|
||
|
yield target_.throw(err)
|
||
|
|
||
|
@coroutine
|
||
|
def lookforNumbers(target):
|
||
|
""" Coroutine which parse numbers
|
||
|
|
||
|
|
||
|
:exemple:
|
||
|
>>> str2list = lookforNumbers(list_sink)
|
||
|
>>> for i in "12+1234*67":
|
||
|
... str2list.send(i)
|
||
|
>>> a = str2list.throw(STOOOP)
|
||
|
>>> print(a)
|
||
|
['12', '+', '1234', '*', '67']
|
||
|
|
||
|
>>> str2list = lookforNumbers(list_sink)
|
||
|
>>> for i in "1.2+12.34*67":
|
||
|
... str2list.send(i)
|
||
|
>>> a = str2list.throw(STOOOP)
|
||
|
>>> print(a)
|
||
|
['1.2', '+', '12.34', '*', '67']
|
||
|
|
||
|
>>> str2list = lookforNumbers(list_sink)
|
||
|
>>> for i in "12.3.4*67":
|
||
|
... str2list.send(i)
|
||
|
Traceback (most recent call last):
|
||
|
...
|
||
|
ValueError: Can't build a number with 2 dots (current is 12.3)
|
||
|
>>> str2list = lookforNumbers(list_sink)
|
||
|
>>> for i in ".34*67":
|
||
|
... a = str2list.send(i)
|
||
|
Traceback (most recent call last):
|
||
|
...
|
||
|
ValueError: Can't build a number starting with a dot
|
||
|
|
||
|
"""
|
||
|
try:
|
||
|
target_ = target()
|
||
|
except TypeError:
|
||
|
target_ = target
|
||
|
|
||
|
current = ""
|
||
|
ans = []
|
||
|
try:
|
||
|
while True:
|
||
|
tok = yield
|
||
|
if tok is not None:
|
||
|
try:
|
||
|
int(tok)
|
||
|
except ValueError:
|
||
|
if tok == '.':
|
||
|
if "." in current:
|
||
|
raise ValueError(f"Can't build a number with 2 dots (current is {current})")
|
||
|
elif len(current) == 0:
|
||
|
raise ValueError(f"Can't build a number starting with a dot")
|
||
|
else:
|
||
|
current += tok
|
||
|
else:
|
||
|
if current:
|
||
|
target_.send(current)
|
||
|
current = ""
|
||
|
target_.send(tok)
|
||
|
else:
|
||
|
current += tok
|
||
|
|
||
|
except STOOOP as err:
|
||
|
if current:
|
||
|
target_.send(current)
|
||
|
yield target_.throw(err)
|
||
|
|
||
|
@coroutine
|
||
|
def pparser(target):
|
||
|
""" Parenthesis parser sink
|
||
|
|
||
|
:example:
|
||
|
|
||
|
>>> pp = pparser(list_sink)
|
||
|
>>> for i in "buio":
|
||
|
... pp.send(i)
|
||
|
>>> a = pp.throw(STOOOP)
|
||
|
>>> a
|
||
|
['b', 'u', 'i', 'o']
|
||
|
>>> pp = pparser(list_sink)
|
||
|
>>> for i in "agg(bcz)e(i(o))":
|
||
|
... pp.send(i)
|
||
|
>>> a = pp.throw(STOOOP)
|
||
|
>>> a
|
||
|
['a', 'g', 'g', ['b', 'c', 'z'], 'e', ['i', ['o']]]
|
||
|
|
||
|
"""
|
||
|
target_ = target()
|
||
|
|
||
|
try:
|
||
|
while True:
|
||
|
caract = yield
|
||
|
if caract == "(":
|
||
|
a = yield from pparser(target)
|
||
|
target_.send(a)
|
||
|
elif caract == ")":
|
||
|
a = target_.throw(STOOOP)
|
||
|
return a
|
||
|
else:
|
||
|
target_.send(caract)
|
||
|
except STOOOP as err:
|
||
|
yield target_.throw(err)
|
||
|
|
||
|
@coroutine
|
||
|
def list_sink():
|
||
|
""" Testing sink for coroutines
|
||
|
|
||
|
:example:
|
||
|
|
||
|
>>> sink = list_sink()
|
||
|
>>> for i in '12+34 * 3':
|
||
|
... sink.send(i)
|
||
|
>>> a = sink.throw(STOOOP)
|
||
|
>>> a
|
||
|
['1', '2', '+', '3', '4', ' ', '*', ' ', '3']
|
||
|
|
||
|
"""
|
||
|
ans = list()
|
||
|
try:
|
||
|
while True:
|
||
|
c = (yield)
|
||
|
if c is not None:
|
||
|
ans.append(c)
|
||
|
except STOOOP:
|
||
|
yield ans
|
||
|
|
||
|
def str2(sink):
|
||
|
""" Return a pipeline which parse an expression with the sink as an endpont
|
||
|
|
||
|
:example:
|
||
|
|
||
|
>>> str2nestedlist = str2(list_sink)
|
||
|
>>> exp = "12+3*4"
|
||
|
>>> t = str2nestedlist(exp)
|
||
|
>>> print(t)
|
||
|
['12', '+', '3', '*', '4']
|
||
|
>>> exp = "12*3+4"
|
||
|
>>> t = str2nestedlist(exp)
|
||
|
>>> print(t)
|
||
|
['12', '*', '3', '+', '4']
|
||
|
>>> exp = "12*(3+4)"
|
||
|
>>> t = str2nestedlist(exp)
|
||
|
>>> print(t)
|
||
|
['12', '*', ['3', '+', '4']]
|
||
|
|
||
|
>>> from .tree import MutableTree
|
||
|
>>> str2tree = str2(MutableTree.sink)
|
||
|
>>> exp = "12+3*4"
|
||
|
>>> t = str2tree(exp)
|
||
|
>>> print(t)
|
||
|
+
|
||
|
> 12
|
||
|
> *
|
||
|
| > 3
|
||
|
| > 4
|
||
|
>>> exp = "12*3+4"
|
||
|
>>> t = str2tree(exp)
|
||
|
>>> print(t)
|
||
|
+
|
||
|
> *
|
||
|
| > 12
|
||
|
| > 3
|
||
|
> 4
|
||
|
>>> exp = "12*(3+4)"
|
||
|
>>> t = str2tree(exp)
|
||
|
>>> print(t)
|
||
|
*
|
||
|
> 12
|
||
|
> +
|
||
|
| > 3
|
||
|
| > 4
|
||
|
"""
|
||
|
lfop = lookfor(something_in("+-*/"))
|
||
|
operator_corout = partial(concurent_broadcast, lookfors = [lfop])
|
||
|
|
||
|
def pipeline(expression):
|
||
|
str2_corout = lookforNumbers(operator_corout(pparser(sink)))
|
||
|
for i in expression:
|
||
|
str2_corout.send(i)
|
||
|
a = str2_corout.throw(STOOOP)
|
||
|
|
||
|
return a
|
||
|
|
||
|
return pipeline
|
||
|
|
||
|
|
||
|
str2nestedlist = str2(list_sink)
|
||
|
|
||
|
|
||
|
# -----------------------------
|
||
|
# Reglages pour 'vim'
|
||
|
# vim:set autoindent expandtab tabstop=4 shiftwidth=4:
|
||
|
# cursor: 16 del
|