import subprocess import time import threading import math from tqdm import tqdm class CelesteError(Exception): pass class Celeste: action_space = [ "left", # move left "right", # move right "jump", # jump "dash-u", # dash up "dash-r", # dash right "dash-l", # dash left "dash-ru", # dash right-up "dash-lu" # dash left-up ] def __init__(self): # Start pico-8 self.process = subprocess.Popen( "bin/pico-8/linux/pico8", shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) # Wait for window to open and get window id time.sleep(2) winid = subprocess.check_output([ "xdotool", "search", "--class", "pico8" ]).decode("utf-8").strip().split("\n") if len(winid) != 1: raise Exception("Could not find unique PICO-8 window id") self.winid = winid[0] # Load cartridge self.keystring("load hackcel.p8") self.keypress("Enter") self.keystring("run") self.keypress("Enter", post = 1000) # Initialize variables self.internal_status = {} self.before_out = None self.last_point_frame = 0 # Score system self.frame_counter = 0 self.next_point = 0 self.dist = 0 # distance to next point self.target_points = [ [ # Stage 1 (28, 88), # Start pillar (60, 80), # Middle pillar (105, 64), # Right ledge (25, 40), # Left ledge (110, 16), # End ledge (110, -2), # Next stage ] ] def act(self, action): self.keyup("x") self.keyup("c") self.keyup("Left") self.keyup("Right") self.keyup("Down") self.keyup("Up") if action is None: return elif action == "left": self.keydown("Left") elif action == "right": self.keydown("Right") elif action == "jump": self.keydown("c") elif action == "dash-u": self.keydown("Up") self.keydown("x") elif action == "dash-r": self.keydown("Right") self.keydown("x") elif action == "dash-l": self.keydown("Left") self.keydown("x") elif action == "dash-ru": self.keydown("Up") self.keydown("Right") self.keydown("x") elif action == "dash-lu": self.keydown("Up") self.keydown("Left") self.keydown("x") @property def status(self): try: return { "stage": ( [ [0, 1, 2, 3, 4] ] [int(self.internal_status["ry"])] [int(self.internal_status["rx"])] ), "xpos": int(self.internal_status["px"]), "ypos": int(self.internal_status["py"]), "xvel": float(self.internal_status["vx"]), "yvel": float(self.internal_status["vy"]), "deaths": int(self.internal_status["dc"]), "dist": self.dist, "next_point": self.next_point, "frame_count": self.frame_counter } except KeyError: raise CelesteError("Not enough data to get status.") def keypress(self, key: str, *, post = 200): subprocess.run([ "xdotool", "key", "--window", self.winid, key ]) time.sleep(post / 1000) def keydown(self, key: str): subprocess.run([ "xdotool", "keydown", "--window", self.winid, key ]) def keyup(self, key: str): subprocess.run([ "xdotool", "keyup", "--window", self.winid, key ]) def keystring(self, string, *, delay = 100, post = 200): subprocess.run([ "xdotool", "type", "--window", self.winid, "--delay", str(delay), string ]) time.sleep(post / 1000) def reset(self): self.internal_status = {} self.next_point = 0 self.frame_counter = 0 self.before_out = None self.resetting = True self.last_point_frame = 0 self.keypress("Escape") self.keystring("run") self.keypress("Enter", post = 1000) self.flush_reader() def flush_reader(self): for k in iter(self.process.stdout.readline, ""): k = k.decode("utf-8")[:-1] if k == "!RESTART": break def update_loop(self, before, after): # Get state, call callback, wait for state # One line => one frame. it = iter(self.process.stdout.readline, "") for line in it: l = line.decode("utf-8")[:-1].strip() self.resetting = False # This should only occur at game start if l in ["!RESTART"]: continue self.frame_counter += 1 # Parse status string for entry in l.split(";"): if entry == "": continue key, val = entry.split(":") self.internal_status[key] = val # Update checkpoints tx, ty = self.target_points[self.status["stage"]][self.next_point] x = self.status["xpos"] y = self.status["ypos"] dist = math.sqrt( (x-tx)*(x-tx) + (y-ty)*(y-ty) ) if dist <= 4 and y == ty: print(f"Got point {self.next_point}") self.next_point += 1 self.last_point_frame = self.frame_counter # Recalculate distance to new point tx, ty = self.target_points[self.status["stage"]][self.next_point] dist = math.sqrt( (x-tx)*(x-tx) + (y-ty)*(y-ty) ) # Timeout if we spend too long between points elif self.frame_counter - self.last_point_frame > 40: self.internal_status["dc"] = str(int(self.internal_status["dc"]) + 1) self.dist = dist # Call step callbacks if self.before_out is not None: after(self, self.before_out) if not self.resetting: self.before_out = before(self)