|
#!/usr/bin/env python3 |
|
# Conway’s Game of Chess |
|
# Copyright (C) 2023 Eric Lesiuta |
|
|
|
import argparse |
|
import atexit |
|
import curses |
|
import hashlib |
|
import os |
|
import pickle |
|
import textwrap |
|
import time |
|
import socket |
|
import sys |
|
|
|
|
|
def exit_handler(engine, engine_state, conn, *args) -> None: |
|
“””clean up in the event of an exception and atexit functions aren’t called””” |
|
type_, value, traceback = args |
|
print(type_, value, traceback, file=sys.stderr) |
|
print(” “.join(engine.recent_moves_str)) |
|
with open(engine.args.save, “wb”) as f: |
|
pickle.dump(engine_state, f) |
|
if conn: |
|
conn.close() |
|
|
|
def start_cli() -> int: |
|
# cli |
|
parser = argparse.ArgumentParser(description=“Conway’s game of chess”) |
|
parser.epilog = textwrap.dedent(“”” |
|
Conway’s game of chess is a chess variant where the pieces can reproduce and die. |
|
Legend: White birth queue ┐ |
|
White: P R P N P B P Q P B P N P R <─┘ |
|
┌──────────────────────────────────────┐ |
|
│ # <─ White birth COUNTER on empty w │ |
|
│ squares, born from queue on ^ │ |
|
│ next turn after reaching 2 │ │ |
|
│ │ │ |
|
│ INDICATOR that white has exactly ┘ │ |
|
│ 3 nearby pieces, birth counter │ |
|
│ will increment at the start of the │ |
|
│ next turn, black birth counter and │ |
|
│ indicator are below and separate │ |
|
│ │ |
|
│ # ♔ <─ Piece symbol o │ |
|
│ ^ ^ │ |
|
│ └ Death COUNTER on occupied │ │ |
|
│ squares, dies after reaching 3 │ │ |
|
│ │ │ |
|
│ INDICATOR that the piece has > 3 ┘ │ |
|
│ nearby pieces (overpopulation), │ |
|
│ or < 2 nearby pieces │ |
|
│ (underpopulation) and will die │ |
|
│ │ |
|
│ # <─ Black COUNTER & INDICATOR ──> l │ |
|
└──────────────────────────────────────┘ |
|
The INDICATORs are updated immediately when the conditions are met. |
|
The COUNTERs are incremented only at the start of the respective player’s turn. |
|
Births and deaths also only occur at the start of the respective player’s turn. |
|
If the conditions for a birth or death counter are no longer met, (as shown by the indicators), the counter resets. |
|
Opponent pieces are not counted as nearby pieces for the birth/death population criteria. |
|
On birth, pieces are taken from the birth queue (circular) and placed on the board in order of rank then file. |
|
Placement starts from rank 1 for white and rank 8 for black, with both filling the board from left to right. |
|
The game ends when the king is captured or perishes due to over/underpopulation. |
|
“””) |
|
parser.formatter_class = argparse.RawDescriptionHelpFormatter |
|
parser.add_argument(“–flip”, action=“store_true”, help=“flip the board”) |
|
parser.add_argument(“–save”, action=“store”, metavar=“FILE”, default=“conway_chess.pickle”, help=“save file location”) |
|
parser.add_argument(“–load”, action=“store”, metavar=“FILE”, help=“load a save file”) |
|
parser.add_argument(“–host”, nargs=3, metavar=(“HOST”, “PORT”, “COLOR”), help=“host a game”) |
|
parser.add_argument(“–join”, nargs=2, metavar=(“HOST”, “PORT”), help=“join a game”) |
|
parser.add_argument(“–ascii”, action=“store_true”, help=“use ascii characters for pieces”) |
|
parser.add_argument(“–light”, action=“store_true”, help=“flip unicode piece colors for light terminals”) |
|
args = parser.parse_args() |
|
# print instructions before playing |
|
print(parser.epilog) |
|
_ = input(“Press enter to play”) |
|
# networking |
|
conn, my_colour = None, None |
|
if args.host and args.join: |
|
print(“You can only host or join a game, not both”, file=sys.stderr) |
|
return 1 |
|
if args.host: |
|
if args.host[2] not in (“white”, “black”): |
|
print(“You can only host as white or black”, file=sys.stderr) |
|
return 1 |
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
|
# s = ssl._create_unverified_context().wrap_socket(s, server_side=True) |
|
s.bind((args.host[0], int(args.host[1]))) |
|
s.listen() |
|
conn, addr = s.accept() |
|
conn.sendall(args.host[2].encode()) |
|
my_colour = args.host[2] |
|
atexit.register(conn.close) |
|
elif args.join: |
|
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
|
# conn = ssl._create_unverified_context().wrap_socket(conn, server_side=False) |
|
conn.connect((args.join[0], int(args.join[1]))) |
|
host_colour = conn.recv(5).decode() |
|
if host_colour == “white”: |
|
my_colour = “black” |
|
else: |
|
my_colour = “white” |
|
atexit.register(conn.close) |
|
# engine initialization |
|
engine = Engine(args) |
|
engine_state = [pickle.dumps(engine)] |
|
engine_state_redo = [] |
|
if args.load: |
|
engine_state = pickle.load(open(args.load, “rb”)) |
|
for i in range(len(engine_state)): |
|
engine_state[i] = pickle.loads(engine_state[i]) |
|
engine_state[i].args = args |
|
engine_state[i] = pickle.dumps(engine_state[i]) |
|
engine = pickle.loads(engine_state[–1]) |
|
atexit.register(lambda: print(” “.join(engine.recent_moves_str))) |
|
atexit.register(lambda: pickle.dump(engine_state, open(args.save, “wb”))) |
|
sys.excepthook = lambda *args: exit_handler(engine, engine_state, conn, *args) |
|
try: |
|
from stockfish import Stockfish |
|
stockfish = Stockfish() |
|
except: |
|
stockfish = None |
|
# check terminal size |
|
columns, lines = os.get_terminal_size() |
|
assert engine.height <= lines, f”Terminal height ({lines}) is too short by {engine.height – lines} lines” |
|
assert engine.width <= columns, f”Terminal width ({columns}) is too narrow by {engine.width – columns} columns” |
|
# main curses loop |
|
for err_count in reversed(range(30)): |
|
try: |
|
return curses.wrapper(main_loop, engine, engine_state, engine_state_redo, stockfish, conn, my_colour) |
|
except curses.error as e: |
|
print(“CURSES ERROR: %s” % e, file=sys.stderr) |
|
print(“try resizing your terminal, game will quit in %s seconds” % (err_count + 1), file=sys.stderr) |
|
time.sleep(1) |
|
except Exception as e: |
|
print(“ERROR: %s” % e, file=sys.stderr) |
|
return 1 |
|
return 1 |
|
|
|
def main_loop(stdscr, engine: “Engine”, engine_state: list[bytes], engine_state_redo: list[bytes], stockfish, conn, my_colour) -> int: |
|
“””main loop for the curses implementation of the game””” |
|
curses.cbreak() |
|
curses.noecho() |
|
while True: |
|
# refresh screen |
|
current_display = engine.display(my_colour) |
|
stdscr.clear() |
|
curses.init_pair(1, curses.COLOR_WHITE, curses.COLOR_BLUE) |
|
curses.init_pair(2, curses.COLOR_BLUE, curses.COLOR_WHITE) |
|
curses.init_pair(3, curses.COLOR_WHITE, curses.COLOR_BLUE) |
|
curses.init_pair(4, curses.COLOR_YELLOW, curses.COLOR_WHITE) |
|
curses.init_pair(5, curses.COLOR_YELLOW, curses.COLOR_BLUE) |
|
curses.init_pair(6, curses.COLOR_GREEN, curses.COLOR_BLACK) |
|
curses.init_pair(7, curses.COLOR_YELLOW, curses.COLOR_BLACK) |
|
curses.init_pair(8, curses.COLOR_RED, curses.COLOR_BLACK) |
|
curses.init_pair(9, curses.COLOR_CYAN, curses.COLOR_BLACK) |
|
stdscr.attrset(curses.color_pair(0)) |
|
y = 0 |
|
for line in current_display: |
|
for x, char in enumerate(line): |
|
# pieces |
|
if char in “RNBQKP” and y > 4 and y < engine.height – 2: |
|
if engine.use_unicode: |
|
char = engine.unicode_replacements[char] |
|
else: |
|
if char == “K”: |
|
stdscr.attrset(curses.color_pair(4)) |
|
else: |
|
stdscr.attrset(curses.color_pair(2)) |
|
elif char in “rnbqkp” and y > 4 and y < engine.height – 2: |
|
if engine.use_unicode: |
|
char = engine.unicode_replacements[char] |
|
else: |
|
if char == “k”: |
|
stdscr.attrset(curses.color_pair(5)) |
|
else: |
|
stdscr.attrset(curses.color_pair(3)) |
|
# indicators |
|
elif char in “wl” and y > 4 and y < engine.height – 2: |
|
stdscr.attrset(curses.color_pair(9)) |
|
elif char in “ou” and y > 4 and y < engine.height – 2: |
|
stdscr.attrset(curses.color_pair(7)) |
|
# death counters |
|
elif char == “0” and x > 1 and x < engine.width – 1 and y > 4 and y < engine.height – 2 and (y – 5) % 4 == 2: |
|
stdscr.attrset(curses.color_pair(7)) |
|
elif char == “1” and x > 1 and x < engine.width – 1 and y > 4 and y < engine.height – 2 and (y – 5) % 4 == 2: |
|
stdscr.attrset(curses.color_pair(7)) |
|
elif char == “2” and x > 1 and x < engine.width – 1 and y > 4 and y < engine.height – 2 and (y – 5) % 4 == 2: |
|
stdscr.attrset(curses.color_pair(7)) |
|
elif char == “3” and x > 1 and x < engine.width – 1 and y > 4 and y < engine.height – 2 and (y – 5) % 4 == 2: |
|
stdscr.attrset(curses.color_pair(8)) |
|
# birth counters |
|
elif char == “0” and x > 1 and x < engine.width – 1 and y > 4 and y < engine.height – 2 and (y – 5) % 4 != 2: |
|
stdscr.attrset(curses.color_pair(9)) |
|
elif char == “1” and x > 1 and x < engine.width – 1 and y > 4 and y < engine.height – 2 and (y – 5) % 4 != 2: |
|
stdscr.attrset(curses.color_pair(9)) |
|
elif char == “2” and x > 1 and x < engine.width – 1 and y > 4 and y < engine.height – 2 and (y – 5) % 4 != 2: |
|
stdscr.attrset(curses.color_pair(6)) |
|
# board |
|
else: |
|
stdscr.attrset(curses.color_pair(0)) |
|
stdscr.addstr(y, x, char) |
|
y += 1 |
|
stdscr.move(*engine.get_cursor()) |
|
stdscr.refresh() |
|
# check for key press, sync with network player if connected, and update engine state |
|
key = None |
|
if conn: |
|
if my_colour == engine.current_turn: |
|
ch: int = stdscr.getch() |
|
ch = engine.flip_cursor_y(ch, curses.KEY_UP, curses.KEY_DOWN) |
|
conn.sendall(ch.to_bytes(2, “big”) + hashlib.sha256(pickle.dumps(engine.recent_moves_str)).digest()[–2:]) |
|
else: |
|
msg = conn.recv(4) |
|
ch = int.from_bytes(msg[:2], “big”) |
|
hash_lsb = msg[2:] |
|
assert hash_lsb == hashlib.sha256(pickle.dumps(engine.recent_moves_str)).digest()[–2:], f”client and server are out of sync” |
|
else: |
|
ch: int = stdscr.getch() |
|
ch = engine.flip_cursor_y(ch, curses.KEY_UP, curses.KEY_DOWN) |
|
if ch == ord(“n“) or ch == ord(” “): |
|
key = “enter” |
|
elif ch == ord(“s”): |
|
key = “stockfish” |
|
elif ch == curses.KEY_UP: |
|
key = “up” |
|
elif ch == curses.KEY_DOWN: |
|
key = “down” |
|
elif ch == curses.KEY_LEFT: |
|
key = “left” |
|
elif ch == curses.KEY_RIGHT: |
|
key = “right” |
|
elif ch == curses.KEY_BACKSPACE or ch == ord(“u”): |
|
if len(engine_state) >= 2: |
|
engine_state_redo.append(engine_state.pop()) |
|
engine = pickle.loads(engine_state[–1]) |
|
continue |
|
elif ch == ord(“r”): |
|
if engine_state_redo: |
|
engine_state.append(engine_state_redo.pop()) |
|
engine = pickle.loads(engine_state[–1]) |
|
continue |
|
elif ch == 27 or ch == ord(“q”): |
|
key = “esc” |
|
return 0 |
|
else: |
|
key = “other” |
|
if engine.update_state(key, stockfish): |
|
engine_state.append(pickle.dumps(engine)) |
|
engine_state_redo = [] |
|
|
|
|
|
class Engine: |
|
def __init__(self, args) -> None: |
|
self.args = args |
|
self.board = Board(self.args) |
|
# tick all the pieces for the first turn |
|
for piece in self.board.get_pieces(): |
|
piece.tick(self.board.get_surrounding_pieces(piece), “white”, True) |
|
self.cursor_row = 0 |
|
self.cursor_col = 0 |
|
self.height = len(self.board.display()) + 5 |
|
self.width = len(self.board.display()[0]) |
|
self.white_birth_queue = [“P”, “R”, “P”, “N”, “P”, “B”, “P”, “Q”, “P”, “B”, “P”, “N”, “P”, “R”] |
|
self.black_birth_queue = [“P”, “R”, “P”, “N”, “P”, “B”, “P”, “Q”, “P”, “B”, “P”, “N”, “P”, “R”] |
|
self.selected_piece = None |
|
self.current_turn = “white” |
|
self.col_labels = [“a”, “b”, “c”, “d”, “e”, “f”, “g”, “h”] |
|
self.recent_moves = [] |
|
self.recent_moves_str = [] |
|
self.game_over_message = None |
|
self.use_unicode = not self.args.ascii |
|
self.unicode_pieces = “♟♜♞♝♛♚♙♖♘♗♕♔” |
|
self.ascii_pieces = “PRNBQKprnbqk” |
|
if self.args.light: |
|
self.ascii_pieces = “prnbqkPRNBQK” |
|
self.unicode_replacements = dict(zip(self.ascii_pieces, self.unicode_pieces)) |
|
assert self.height == len(self.display(None)) |
|
assert self.width == len(self.display(None)[0]) |
|
|
|
def get_cursor(self) -> tuple[int, int]: |
|
“””get the position of the cursor in terms of display row and column””” |
|
if self.args.flip: |
|
real_row = (7 – self.cursor_row) * 4 + 7 |
|
else: |
|
real_row = self.cursor_row * 4 + 7 |
|
real_col = self.cursor_col * 6 + 4 |
|
return real_row, real_col |
|
|
|
def flip_cursor_y(self, ch: int, key_up: int, key_down: int) -> int: |
|
“””flip the key press for up and down””” |
|
if self.args.flip: |
|
if ch == key_up: |
|
return key_down |
|
elif ch == key_down: |
|
return key_up |
|
return ch |
|
|
|
def display(self, my_colour) -> list: |
|
board = self.board.display() |
|
if self.game_over_message is not None: |
|
header = f”Game over: {self.game_over_message}“.center(self.width, ” “) |
|
else: |
|
header = f”Current turn: {self.current_turn}{‘ (your turn)’ if my_colour == self.current_turn else ”}“.center(self.width, ” “) |
|
board.insert(0, list(header)) |
|
if self.selected_piece is None: |
|
header_2 = “Selected: None”.center(self.width, ” “) |
|
else: |
|
header_2 = f”Selected: {self.selected_piece}{self.col_labels[self.selected_piece.col]}{self.selected_piece.row + 1}“.center(self.width, ” “) |
|
board.insert(1, list(header_2)) |
|
header_3 = f”Recent moves: {‘ | ‘.join(self.recent_moves_str[–3:])}“.center(self.width, ” “) |
|
board.insert(2, list(header_3)) |
|
if self.use_unicode: |
|
white_queue = f”White: {‘ ‘.join([self.unicode_replacements[piece] for piece in self.white_birth_queue])}“.center(self.width, ” “) |
|
black_queue = f”Black: {‘ ‘.join([self.unicode_replacements[piece.lower()] for piece in self.black_birth_queue])}“.center(self.width, ” “) |
|
else: |
|
white_queue = f”White: {‘ ‘.join(self.white_birth_queue)}“.center(self.width, ” “) |
|
black_queue = f”Black: {‘ ‘.join(self.black_birth_queue)}“.center(self.width, ” “) |
|
if self.args.flip: |
|
board.insert(3, list(black_queue)) |
|
board.append(list(white_queue)) |
|
else: |
|
board.insert(3, list(white_queue)) |
|
board.append(list(black_queue)) |
|
return board |
|
|
|
def move_is_valid(self, source, dest, stockfish) -> bool: |
|
source_row = source.row + 1 |
|
source_col = self.col_labels[source.col] |
|
dest_row = dest.row + 1 |
|
dest_col = self.col_labels[dest.col] |
|
move_str = f”{source}{source_col}{source_row}->{dest}{dest_col}{dest_row}“ |
|
if source.move_is_valid(dest): |
|
if stockfish is not None: |
|
try: |
|
stockfish.set_fen_position(self.board.get_fen_position(self.current_turn)) |
|
if stockfish.is_move_correct(f”{source_col}{source_row}{dest_col}{dest_row}“): |
|
self.recent_moves.append((source, dest)) |
|
─>─span>─span>─┘span>