#!/usr/bin/env python3 |
# Conway’s Game of Chess |
# Copyright (C) 2023 Eric Lesiuta |
import argparse |
import atexit |
import curses |
import hashlib |
import os |
import pickle |
import textwrap |
import time |
import socket |
import sys |
def exit_handler(engine, engine_state, conn, *args) -> None: |
“””clean up in the event of an exception and atexit functions aren’t called””” |
type_, value, traceback = args |
print(type_, value, traceback, file=sys.stderr) |
print(” “.join(engine.recent_moves_str)) |
with open(engine.args.save, “wb”) as f: |
pickle.dump(engine_state, f) |
if conn: |
conn.close() |
def start_cli() -> int: |
# cli |
parser = argparse.ArgumentParser(description=“Conway’s game of chess”) |
parser.epilog = textwrap.dedent(“”” |
Conway’s game of chess is a chess variant where the pieces can reproduce and die. |
Legend: White birth queue ┐ |
White: P R P N P B P Q P B P N P R <─┘ |
┌──────────────────────────────────────┐ |
│ # <─ White birth COUNTER on empty w │ |
│ squares, born from queue on ^ │ |
│ next turn after reaching 2 │ │ |
│ │ │ |
│ INDICATOR that white has exactly ┘ │ |
│ 3 nearby pieces, birth counter │ |
│ will increment at the start of the │ |
│ next turn, black birth counter and │ |
│ indicator are below and separate │ |
│ │ |
│ # ♔ <─ Piece symbol o │ |
│ ^ ^ │ |
│ └ Death COUNTER on occupied │ │ |
│ squares, dies after reaching 3 │ │ |
│ │ │ |
│ INDICATOR that the piece has > 3 ┘ │ |
│ nearby pieces (overpopulation), │ |
│ or < 2 nearby pieces │ |
│ (underpopulation) and will die │ |
│ │ |
│ # <─ Black COUNTER & INDICATOR ──> l │ |
└──────────────────────────────────────┘ |
The INDICATORs are updated immediately when the conditions are met. |
The COUNTERs are incremented only at the start of the respective player’s turn. |
Births and deaths also only occur at the start of the respective player’s turn. |
If the conditions for a birth or death counter are no longer met, (as shown by the indicators), the counter resets. |
Opponent pieces are not counted as nearby pieces for the birth/death population criteria. |
On birth, pieces are taken from the birth queue (circular) and placed on the board in order of rank then file. |
Placement starts from rank 1 for white and rank 8 for black, with both filling the board from left to right. |
The game ends when the king is captured or perishes due to over/underpopulation. |
“””) |
parser.formatter_class = argparse.RawDescriptionHelpFormatter |
parser.add_argument(“–flip”, action=“store_true”, help=“flip the board”) |
parser.add_argument(“–save”, action=“store”, metavar=“FILE”, default=“conway_chess.pickle”, help=“save file location”) |
parser.add_argument(“–load”, action=“store”, metavar=“FILE”, help=“load a save file”) |
parser.add_argument(“–host”, nargs=3, metavar=(“HOST”, “PORT”, “COLOR”), help=“host a game”) |
parser.add_argument(“–join”, nargs=2, metavar=(“HOST”, “PORT”), help=“join a game”) |
parser.add_argument(“–ascii”, action=“store_true”, help=“use ascii characters for pieces”) |
parser.add_argument(“–light”, action=“store_true”, help=“flip unicode piece colors for light terminals”) |
args = parser.parse_args() |
# print instructions before playing |
print(parser.epilog) |
_ = input(“Press enter to play”) |
# networking |
conn, my_colour = None, None |
if args.host and args.join: |
print(“You can only host or join a game, not both”, file=sys.stderr) |
return 1 |
if args.host: |
if args.host[2] not in (“white”, “black”): |
print(“You can only host as white or black”, file=sys.stderr) |
return 1 |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
# s = ssl._create_unverified_context().wrap_socket(s, server_side=True) |
s.bind((args.host[0], int(args.host[1]))) |
s.listen() |
conn, addr = s.accept() |
conn.sendall(args.host[2].encode()) |
my_colour = args.host[2] |
atexit.register(conn.close) |
elif args.join: |
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
# conn = ssl._create_unverified_context().wrap_socket(conn, server_side=False) |
conn.connect((args.join[0], int(args.join[1]))) |
host_colour = conn.recv(5).decode() |
if host_colour == “white”: |
my_colour = “black” |
else: |
my_colour = “white” |
atexit.register(conn.close) |
# engine initialization |
engine = Engine(args) |
engine_state = [pickle.dumps(engine)] |
engine_state_redo = [] |
if args.load: |
engine_state = pickle.load(open(args.load, “rb”)) |
for i in range(len(engine_state)): |
engine_state[i] = pickle.loads(engine_state[i]) |
engine_state[i].args = args |
engine_state[i] = pickle.dumps(engine_state[i]) |
engine = pickle.loads(engine_state[–1]) |
atexit.register(lambda: print(” “.join(engine.recent_moves_str))) |
atexit.register(lambda: pickle.dump(engine_state, open(args.save, “wb”))) |
sys.excepthook = lambda *args: exit_handler(engine, engine_state, conn, *args) |
try: |
from stockfish import Stockfish |
stockfish = Stockfish() |
except: |
stockfish = None |
# check terminal size |
columns, lines = os.get_terminal_size() |
assert engine.height <= lines, f”Terminal height ({lines}) is too short by {engine.height – lines} lines” |
assert engine.width <= columns, f”Terminal width ({columns}) is too narrow by {engine.width – columns} columns” |
# main curses loop |
for err_count in reversed(range(30)): |
try: |
return curses.wrapper(main_loop, engine, engine_state, engine_state_redo, stockfish, conn, my_colour) |
except curses.error as e: |
print(“CURSES ERROR: %s” % e, file=sys.stderr) |
print(“try resizing your terminal, game will quit in %s seconds” % (err_count + 1), file=sys.stderr) |
time.sleep(1) |
except Exception as e: |
print(“ERROR: %s” % e, file=sys.stderr) |
return 1 |
return 1 |
def main_loop(stdscr, engine: “Engine”, engine_state: list[bytes], engine_state_redo: list[bytes], stockfish, conn, my_colour) -> int: |
“””main loop for the curses implementation of the game””” |
curses.cbreak() |
curses.noecho() |
while True: |
# refresh screen |
current_display = engine.display(my_colour) |
stdscr.clear() |
curses.init_pair(1, curses.COLOR_WHITE, curses.COLOR_BLUE) |
curses.init_pair(2, curses.COLOR_BLUE, curses.COLOR_WHITE) |
curses.init_pair(3, curses.COLOR_WHITE, curses.COLOR_BLUE) |
curses.init_pair(4, curses.COLOR_YELLOW, curses.COLOR_WHITE) |
curses.init_pair(5, curses.COLOR_YELLOW, curses.COLOR_BLUE) |
curses.init_pair(6, curses.COLOR_GREEN, curses.COLOR_BLACK) |
curses.init_pair(7, curses.COLOR_YELLOW, curses.COLOR_BLACK) |
curses.init_pair(8, curses.COLOR_RED, curses.COLOR_BLACK) |
curses.init_pair(9, curses.COLOR_CYAN, curses.COLOR_BLACK) |
stdscr.attrset(curses.color_pair(0)) |
y = 0 |
for line in current_display: |
for x, char in enumerate(line): |
# pieces |
if char in “RNBQKP” and y > 4 and y < engine.height – 2: |
if engine.use_unicode: |
char = engine.unicode_replacements[char] |
else: |
if char == “K”: |
stdscr.attrset(curses.color_pair(4)) |
else: |
stdscr.attrset(curses.color_pair(2)) |
elif char in “rnbqkp” and y > 4 and y < engine.height – 2: |
if engine.use_unicode: |
char = engine.unicode_replacements[char] |
else: |
if char == “k”: |
stdscr.attrset(curses.color_pair(5)) |
else: |
stdscr.attrset(curses.color_pair(3)) |
# indicators |
elif char in “wl” and y > 4 and y < engine.height – 2: |
stdscr.attrset(curses.color_pair(9)) |
elif char in “ou” and y > 4 and y < engine.height – 2: |
stdscr.attrset(curses.color_pair(7)) |
# death counters |
elif char == “0” and x > 1 and x < engine.width – 1 and y > 4 and y < engine.height – 2 and (y – 5) % 4 == 2: |
stdscr.attrset(curses.color_pair(7)) |
elif char == “1” and x > 1 and x < engine.width – 1 and y > 4 and y < engine.height – 2 and (y – 5) % 4 == 2: |
stdscr.attrset(curses.color_pair(7)) |
elif char == “2” and x > 1 and x < engine.width – 1 and y > 4 and y < engine.height – 2 and (y – 5) % 4 == 2: |
stdscr.attrset(curses.color_pair(7)) |
elif char == “3” and x > 1 and x < engine.width – 1 and y > 4 and y < engine.height – 2 and (y – 5) % 4 == 2: |
stdscr.attrset(curses.color_pair(8)) |
# birth counters |
elif char == “0” and x > 1 and x < engine.width – 1 and y > 4 and y < engine.height – 2 and (y – 5) % 4 != 2: |
stdscr.attrset(curses.color_pair(9)) |
elif char == “1” and x > 1 and x < engine.width – 1 and y > 4 and y < engine.height – 2 and (y – 5) % 4 != 2: |
stdscr.attrset(curses.color_pair(9)) |
elif char == “2” and x > 1 and x < engine.width – 1 and y > 4 and y < engine.height – 2 and (y – 5) % 4 != 2: |
stdscr.attrset(curses.color_pair(6)) |
# board |
else: |
stdscr.attrset(curses.color_pair(0)) |
stdscr.addstr(y, x, char) |
y += 1 |
stdscr.move(*engine.get_cursor()) |
stdscr.refresh() |
# check for key press, sync with network player if connected, and update engine state |
key = None |
if conn: |
if my_colour == engine.current_turn: |
ch: int = stdscr.getch() |
ch = engine.flip_cursor_y(ch, curses.KEY_UP, curses.KEY_DOWN) |
conn.sendall(ch.to_bytes(2, “big”) + hashlib.sha256(pickle.dumps(engine.recent_moves_str)).digest()[–2:]) |
else: |
msg = conn.recv(4) |
ch = int.from_bytes(msg[:2], “big”) |
hash_lsb = msg[2:] |
assert hash_lsb == hashlib.sha256(pickle.dumps(engine.recent_moves_str)).digest()[–2:], f”client and server are out of sync” |
else: |
ch: int = stdscr.getch() |
ch = engine.flip_cursor_y(ch, curses.KEY_UP, curses.KEY_DOWN) |
if ch == ord(“n“) or ch == ord(” “): |
key = “enter” |
elif ch == ord(“s”): |
key = “stockfish” |
elif ch == curses.KEY_UP: |
key = “up” |
elif ch == curses.KEY_DOWN: |
key = “down” |
elif ch == curses.KEY_LEFT: |
key = “left” |
elif ch == curses.KEY_RIGHT: |
key = “right” |
elif ch == curses.KEY_BACKSPACE or ch == ord(“u”): |
if len(engine_state) >= 2: |
engine_state_redo.append(engine_state.pop()) |
engine = pickle.loads(engine_state[–1]) |
continue |
elif ch == ord(“r”): |
if engine_state_redo: |
engine_state.append(engine_state_redo.pop()) |
engine = pickle.loads(engine_state[–1]) |
continue |
elif ch == 27 or ch == ord(“q”): |
key = “esc” |
return 0 |
else: |
key = “other” |
if engine.update_state(key, stockfish): |
engine_state.append(pickle.dumps(engine)) |
engine_state_redo = [] |
class Engine: |
def __init__(self, args) -> None: |
self.args = args |
self.board = Board(self.args) |
# tick all the pieces for the first turn |
for piece in self.board.get_pieces(): |
piece.tick(self.board.get_surrounding_pieces(piece), “white”, True) |
self.cursor_row = 0 |
self.cursor_col = 0 |
self.height = len(self.board.display()) + 5 |
self.width = len(self.board.display()[0]) |
self.white_birth_queue = [“P”, “R”, “P”, “N”, “P”, “B”, “P”, “Q”, “P”, “B”, “P”, “N”, “P”, “R”] |
self.black_birth_queue = [“P”, “R”, “P”, “N”, “P”, “B”, “P”, “Q”, “P”, “B”, “P”, “N”, “P”, “R”] |
self.selected_piece = None |
self.current_turn = “white” |
self.col_labels = [“a”, “b”, “c”, “d”, “e”, “f”, “g”, “h”] |
self.recent_moves = [] |
self.recent_moves_str = [] |
self.game_over_message = None |
self.use_unicode = not self.args.ascii |
self.unicode_pieces = “♟♜♞♝♛♚♙♖♘♗♕♔” |
self.ascii_pieces = “PRNBQKprnbqk” |
if self.args.light: |
self.ascii_pieces = “prnbqkPRNBQK” |
self.unicode_replacements = dict(zip(self.ascii_pieces, self.unicode_pieces)) |
assert self.height == len(self.display(None)) |
assert self.width == len(self.display(None)[0]) |
def get_cursor(self) -> tuple[int, int]: |
“””get the position of the cursor in terms of display row and column””” |
if self.args.flip: |
real_row = (7 – self.cursor_row) * 4 + 7 |
else: |
real_row = self.cursor_row * 4 + 7 |
real_col = self.cursor_col * 6 + 4 |
return real_row, real_col |
def flip_cursor_y(self, ch: int, key_up: int, key_down: int) -> int: |
“””flip the key press for up and down””” |
if self.args.flip: |
if ch == key_up: |
return key_down |
elif ch == key_down: |
return key_up |
return ch |
def display(self, my_colour) -> list: |
board = self.board.display() |
if self.game_over_message is not None: |
header = f”Game over: {self.game_over_message}“.center(self.width, ” “) |
else: |
header = f”Current turn: {self.current_turn}{‘ (your turn)’ if my_colour == self.current_turn else ”}“.center(self.width, ” “) |
board.insert(0, list(header)) |
if self.selected_piece is None: |
header_2 = “Selected: None”.center(self.width, ” “) |
else: |
header_2 = f”Selected: {self.selected_piece}{self.col_labels[self.selected_piece.col]}{self.selected_piece.row + 1}“.center(self.width, ” “) |
board.insert(1, list(header_2)) |
header_3 = f”Recent moves: {‘ | ‘.join(self.recent_moves_str[–3:])}“.center(self.width, ” “) |
board.insert(2, list(header_3)) |
if self.use_unicode: |
white_queue = f”White: {‘ ‘.join([self.unicode_replacements[piece] for piece in self.white_birth_queue])}“.center(self.width, ” “) |
black_queue = f”Black: {‘ ‘.join([self.unicode_replacements[piece.lower()] for piece in self.black_birth_queue])}“.center(self.width, ” “) |
else: |
white_queue = f”White: {‘ ‘.join(self.white_birth_queue)}“.center(self.width, ” “) |
black_queue = f”Black: {‘ ‘.join(self.black_birth_queue)}“.center(self.width, ” “) |
if self.args.flip: |
board.insert(3, list(black_queue)) |
board.append(list(white_queue)) |
else: |
board.insert(3, list(white_queue)) |
board.append(list(black_queue)) |
return board |
def move_is_valid(self, source, dest, stockfish) -> bool: |
source_row = source.row + 1 |
source_col = self.col_labels[source.col] |
dest_row = dest.row + 1 |
dest_col = self.col_labels[dest.col] |
move_str = f”{source}{source_col}{source_row}->{dest}{dest_col}{dest_row}“ |
if source.move_is_valid(dest): |
if stockfish is not None: |
try: |
stockfish.set_fen_position(self.board.get_fen_position(self.current_turn)) |
if stockfish.is_move_correct(f”{source_col}{source_row}{dest_col}{dest_row}“): |
self.recent_moves.append((source, dest)) |