import subprocess import time import threading import math class Celeste: def __init__(self): # Start process self.process = subprocess.Popen( "bin/pico-8/linux/pico8", shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) # Wait for window to open and get window id time.sleep(2) winid = subprocess.check_output([ "xdotool", "search", "--class", "pico8" ]).decode("utf-8").strip().split("\n") if len(winid) != 1: raise Exception("Could not find unique PICO-8 window id") self.winid = winid[0] # Load cartridge self.keystring("load hackcel.p8") self.keypress("Enter") self.keystring("run") self.keypress("Enter", post = 1000) # Initialize variables self.internal_status = {} self.dead = False # -1: left # 0: not moving # 1: moving right self.moving = 0 # Start state update thread self.update_thread = threading.Thread(target = self._update_loop) self.update_thread.start() def act(self, action): self.keyup("x") self.keyup("c") self.keyup("Down") self.keyup("Up") if self.moving != -1: self.keyup("Left") if self.moving != 1: self.keyup("Right") if action is None: self.moving = 0 self.keyup("Left") self.keyup("Right") elif action == "left": if self.moving != -1: self.keydown("Left") self.moving = -1 elif action == "right": if self.moving != 1: self.keydown("Right") self.moving = 1 @property def status(self): return { "stage": ( [ [0, 1, 2, 3, 4] ] [int(self.internal_status["ry"])] [int(self.internal_status["rx"])] ), "xpos": int(self.internal_status["px"]), "ypos": int(self.internal_status["py"]), "xvel": float(self.internal_status["vx"]), "yvel": float(self.internal_status["vy"]) } # Possible actions @property def action_space(self): return [ "left", # move left "rght", # move right "jump", # jump "dshn", # dash north "dshe", # dash east "dshw", # dash west "dsne", # dash north-east "dsnw" # dash north-west ] def keypress(self, key: str, *, post = 200): subprocess.run([ "xdotool", "key", "--window", self.winid, key ]) time.sleep(post / 1000) def keydown(self, key: str): subprocess.run([ "xdotool", "keydown", "--window", self.winid, key ]) def keyup(self, key: str): subprocess.run([ "xdotool", "keyup", "--window", self.winid, key ]) def keystring(self, string, *, delay = 100, post = 200): subprocess.run([ "xdotool", "type", "--window", self.winid, "--delay", str(delay), string ]) time.sleep(post / 1000) def reset(self): self.internal_status = {} if not self.dead: self.keypress("Escape") self.keystring("run") self.keypress("Enter", post = 1000) self.dead = False def _update_loop(self): # Poll process for new output until finished for line in iter(self.process.stdout.readline, ""): l = line.decode("utf-8")[:-1] if l in ["!RESTART"]: continue for entry in l.split(";"): key, val = entry.split(":") self.internal_status[key] = val # Exit game on death if "dc" in self.internal_status and self.internal_status["dc"] != "0": self.keypress("Escape") self.dead = True # Flush stream reader for k in iter(self.process.stdout.readline, ""): k = k.decode("utf-8")[:-1] if k == "!RESTART": break # Stage 1: next_point = 0 target_points = [ (28, 88), # Start pillar (60, 80), # Middle pillar (105, 64), # Right ledge (25, 40), # Left ledge (110, 16), # End ledge (110, -2), # Next stage ] # += 5 c = Celeste() while True: if c.dead: print("\n\nDead, resetting...") c.reset() tx, ty = target_points[next_point] x = c.status["xpos"] y = c.status["ypos"] dist = math.sqrt( (x-tx)*(x-tx) + (y-ty)*(y-ty) ) if dist <= 4 and y == ty: next_point += 1 print(f"Target point: {next_point:02}, Dist: {dist:0.3}") #print() #print(c.status)