import numpy as np
from enum import Enum
from typing import Union, List
from dataclasses import dataclass
import time
from mpt4py.base import Vector, Matrix, IntVector
from mpt4py.tolerances import tolerances
from mpt4py.util import lexico_argmax, lexico_argmin
class LcpSolverStatus(Enum):
LCP_FEASIBLE = 0
LCP_INFEASIBLE = -1
LCP_RAYTERMINATION = -2
LCP_PRETERMINATED = -3
LCP_CODEERROR = -4
@dataclass
class LcpSolution:
w: Vector | None
z: Vector | None
basis: List[int] | None
status: LcpSolverStatus
def get_variable_name(index: Union[int, List[int], IntVector], n: int) -> Union[str, List[str]]:
"""
Get the variable name corresponding to the given index in the LCP problem.
Args:
index (Union[int, List[int], IntVector]): The index or indices of the variable(s).
n (int): The size of the problem (number of variables).
Returns:
Union[str, List[str]]: The variable name(s) as a string or a list of strings.
Raises:
ValueError: If the index is out of bounds.
"""
def single_variable_name(idx: int) -> str:
if idx <= n - 1:
return f'w{idx + 1}'
elif idx <= 2 * n - 1:
return f'z{idx + 1 - n}'
elif idx == 2 * n:
return 'alpha' # The artificial variable in Lemke's method
else:
raise ValueError("Index out of bounds")
if isinstance(index, (int, np.integer)):
return single_variable_name(index)
elif isinstance(index, (list, np.ndarray)):
return [single_variable_name(k) for k in index]
else:
raise TypeError("Index must be an int, list, or numpy array")
def get_complement(index: Union[int, List[int], IntVector], n: int) -> Union[int, List[int], IntVector]:
"""
Get the complement of the given index in the context of an LCP problem.
Args:
index (Union[int, List[int], IntVector]): The index or indices to find the complement for.
n (int): The size of the problem (number of variables).
Returns:
Union[int, List[int], IntVector]: The complement index or indices.
Raises:
ValueError: If any index is out of bounds.
"""
if isinstance(index, (int, np.integer)):
if index < 0 or index >= 2 * n:
raise ValueError("Index out of bounds")
return (index + n) % (2 * n)
elif isinstance(index, (list, np.ndarray)):
index_array = np.asarray(index, dtype=int)
if np.any(index_array < 0) or np.any(index_array >= 2 * n):
raise ValueError("Index out of bounds")
return ((index_array + n) % (2 * n)).tolist() if isinstance(index, list) else (index_array + n) % (2 * n)
else:
raise TypeError("Index must be an int, list, or numpy array")
[docs]
def lcp_solve(M: Matrix, q: Vector,
solver: str = 'graves',
max_pivots: int = 1000,
max_time: float = 20,
zero_tol: float = tolerances['zero'],
verbose: bool = False
) -> LcpSolution:
"""
Solve a Linear Complementarity Problem (LCP)
"""
if M.ndim != 2:
raise ValueError("M must be a matrix")
if q.ndim != 1:
raise ValueError("q must be a vector")
if M.shape[0] != M.shape[1]:
raise ValueError("M must be a square matrix")
if q.size != M.shape[0]:
raise ValueError("Incompatible dimensions: number of rows and columns of M must match the number of elements in q")
if verbose:
print("-------------------------------------------")
print("Linear Complementarity Problem\n")
print("Num variables: {0}".format(q.size))
print("Method: {0}".format(solver))
if solver == "lemke":
return lcp_solve_lemke_method(M=M, q=q, max_pivots=max_pivots, max_time=max_time, zero_tol=zero_tol, verbose=verbose)
elif solver == "graves":
return lcp_solve_graves_method(M=M, q=q, max_pivots=max_pivots, max_time=max_time, zero_tol=zero_tol, verbose=verbose)
else:
raise ValueError("Unsupported solver type")
def lcp_solve_graves_method(M: Matrix, q: Vector, max_pivots: int = 1000,
max_time: float = 20, zero_tol: float = tolerances['zero'], verbose: bool = False) -> LcpSolution:
"""
An implementation of the Graves' Principal Pivot Method
"""
n = np.size(q)
start_time = time.time()
latest_time = start_time
if np.all(q > -zero_tol):
return LcpSolution(w=q, z=np.zeros_like(q), basis=np.arange(n).tolist(), status=LcpSolverStatus.LCP_FEASIBLE)
# basic_var_indices = np.array(range(n))
basic_var_indices = [*range(n)]
A = np.hstack([np.eye(n), -M])
num_pivots = 0
while num_pivots < max_pivots:
current_time = time.time()
if current_time - latest_time > 1.0:
print("{0} seconds, {1} pivots".format(current_time - start_time, num_pivots))
latest_time = time.time()
beta = np.linalg.solve(A[:, basic_var_indices], np.eye(n)) # ! if A_B has special properties, consider using more efficient facrotization methods?
q_bar = beta @ q
if verbose:
print("\n------------------ Iteration", num_pivots, "------------------")
print("Current basis is: ", basic_var_indices)
print("The inverse tableau is:\n", np.hstack([beta, q_bar[:, np.newaxis]]))
# termination condition
if np.all(q_bar >= -zero_tol):
x = np.zeros(2*n)
x[basic_var_indices] = q_bar
return LcpSolution(w=x[:n], z=x[-n:], basis=basic_var_indices, status=LcpSolverStatus.LCP_FEASIBLE)
negative_indices = np.where(q_bar < -zero_tol)[0]
test_matrix = beta[negative_indices, :] / q_bar[negative_indices, np.newaxis]
r = negative_indices[lexico_argmax(test_matrix)][0]
t_r = get_complement(basic_var_indices[r], n) # complement of y_r, where y_r is in basic variables. t_r is in non-basic variables
A_r = A[:, t_r] # the column vector corresponding to t_r in original tableau [I, -M]
A_bar_r = np.linalg.solve(A[:, basic_var_indices], A_r) # A_bar_r = beta * A_r = [a_bar_1_r, a_bar_2_r, ..., a_bar_n_r]
a_bar_r_r = A_bar_r[r]
# * case 1, a_bar_r_r is non-zero
if abs(a_bar_r_r) > zero_tol:
basic_var_indices[r] = get_complement(basic_var_indices[r], n)
# * case 2, a_bar_r_r = 0 and a_bar_i_r <= 0 for all i, means the LCP is infeasible
elif np.all(A_bar_r <= zero_tol):
return LcpSolution(w=None, z=None, basis=None, status=LcpSolverStatus.LCP_INFEASIBLE)
# * case 3
else:
positive_indices = np.where(A_bar_r > zero_tol)[0]
test_matrix = beta[positive_indices, :] - np.outer(q_bar[positive_indices] / q_bar[r] / A_bar_r[positive_indices], beta[r, :])
s = positive_indices[lexico_argmin(test_matrix)[0]]
basic_var_indices[r] = get_complement(basic_var_indices[r], n)
basic_var_indices[s] = get_complement(basic_var_indices[s], n)
num_pivots += 1
if verbose:
print("Leaving variable is: ", get_variable_name(r, n))
print("Entering variable is: ", get_variable_name((r+n) % (2*n), n))
print("After pivoting, the new basis is: ", get_variable_name(basic_var_indices, n))
return LcpSolution(w=None, z=None, basis=None, status=LcpSolverStatus.LCP_PRETERMINATED)
def lcp_solve_lemke_method(M: Matrix, q: Vector, max_pivots: int = 1000,
max_time: float = 20, zero_tol: float = tolerances['zero'], verbose: bool = False) -> LcpSolution:
"""
Solve the LCP using Complementary Pivot Method (also called Lemke's method)
Refer to Chapter 2 of Linear Complementarity, Linear and Nonlinear Programming by Katta G. Murty
When ray termination occurs, either the LCP is infeasible, or the LCP is feasible but the algorithm is unable to find the solution.
However, when M satisfies some conditions, ray termination occurs only if the LCP is infeasible.
"""
if np.all(q > -zero_tol):
return LcpSolution(w=q, z=np.zeros_like(q), basis=np.arange(q.size).tolist(), status=LcpSolverStatus.LCP_FEASIBLE)
n = np.size(q)
basic_var_indices = [*range(n)] # indices of basic variables
tableau = np.hstack((np.eye(n), -M, -np.ones((n,1)), q.reshape(-1,1)))
pivot_row_index = int(np.argmin(q)) # ! what happens if there're two elements in q which are equivalent?
if verbose:
print("Initial tablaeu is:\n", tableau)
print("\n------------------ Iteration", 0, "------------------")
print("Leaving variable is: ", get_variable_name(basic_var_indices[pivot_row_index], n))
print("Entering variable is: z0")
# You have to find the complementary index before executing the pivot function, because pivot overwrite the basis_var_index
next_entering_var_index = get_complement(basic_var_indices[pivot_row_index], n)
pivot_col_index = 2*n
_pivot_on_tableau(tableau, pivot_row_index, pivot_col_index)
basic_var_indices[pivot_row_index] = pivot_col_index
if verbose:
print("The tableau after pivoting is: ")
print(tableau)
for itr in range(max_pivots):
entering_var_index = next_entering_var_index
pivot_col_index = entering_var_index
# determine the next leaving variable (choose row)
q = tableau[:, -1]
ratios = np.where(np.abs(tableau[:, pivot_col_index]) < zero_tol, np.inf, q / tableau[:, pivot_col_index])
# deal with the negative ratios
ratios = np.where(ratios < 0, np.inf, ratios)
# if all the values in ratios are infinties, we can say it's a ray termination?
if np.all(ratios >= np.inf*np.ones(n)):
if verbose: print("Ray termination.")
return LcpSolution(w=None, z=None, basis=None, status=LcpSolverStatus.LCP_RAYTERMINATION)
pivot_row_index = _lexico_minimum_ratio_test(tableau, pivot_col_index, basic_var_indices, zero_tol)
leaving_var_index = basic_var_indices[pivot_row_index]
if verbose:
print("\n------------------ Iteration", itr, "------------------")
print("Leaving variable is: ", get_variable_name(basic_var_indices[pivot_row_index], n))
print("Entering variable is: ", get_variable_name(entering_var_index, n))
_pivot_on_tableau(tableau, pivot_row_index, pivot_col_index)
basic_var_indices[pivot_row_index] = pivot_col_index
if verbose:
print("Current tableau after pivoting is: ")
print(tableau)
if leaving_var_index == 2*n: # z0 leaves, solution found
if verbose:
print("\nFeasible solution found!")
print("basis is: ", basic_var_indices)
break
# compute the complementary index
next_entering_var_index = get_complement(leaving_var_index, n)
# Exceed max iteration
if itr == max_pivots - 1:
print("Max Iterations Exceeded.")
return LcpSolution(w=None, z=None, basis=None, status=LcpSolverStatus.LCP_PRETERMINATED)
q = tableau[:, -1]
wz = np.zeros(2*n)
wz[basic_var_indices] = q
return LcpSolution(w=wz[0:n], z=wz[n:2*n], basis=basic_var_indices, status=LcpSolverStatus.LCP_FEASIBLE)
def _pivot_on_tableau(tableau: Matrix, pivot_row_index: int, pivot_col_index: int):
tableau[pivot_row_index, :] /= tableau[pivot_row_index, pivot_col_index]
n = tableau.shape[0]
for i in range(n):
if i != pivot_row_index:
tableau[i, :] -= tableau[i, pivot_col_index] * tableau[pivot_row_index, :]
def _lexico_minimum_ratio_test(tableau: Matrix, pivot_col_index: int, basic_var_indices: List[int], zero_tol: float = tolerances['zero']) -> int:
"""
Find the row index of the lexico minimum of tableau.
In non-degenerate case, just do minimum ratio test.
Lexico minimum ratio test is only carried out in degenerate case.
"""
n = tableau.shape[0]
# If non-degenerate, just do minimum ratio test
# determine the next leaving variable (choose row)
ratios = np.ones(n)
q = tableau[:, -1]
ratios = np.where(np.abs(tableau[:, pivot_col_index]) < zero_tol, np.inf, q / tableau[:, pivot_col_index])
ratios = np.where(ratios < 0, np.inf, ratios)
# Test if there are multiple minimums in ratios (degenrate in q)
min_ratios = np.min(ratios)
min_count = np.count_nonzero(ratios == min_ratios)
if min_count == 1:
return int(np.argmin(ratios))
else:
tableau_basis_column = tableau[:, basic_var_indices]
beta = np.linalg.solve(tableau_basis_column, np.eye(n)) # ? factorize the matrix, might encounter numerical issue?
q_bar = np.reshape(beta @ q, (-1, 1))
q_bar_and_beta = np.hstack((q_bar, beta)) # [beta*q, beta]
c_bar_j = beta @ tableau[:, pivot_col_index]
positive_c_bar_j_index = np.where(c_bar_j > zero_tol)[0]
# ? If there're multiple lexico_argmin, return the first one. Does this cause problem?
return lexico_argmin(q_bar_and_beta[positive_c_bar_j_index, :])[0]