Mark
/
celeste-ai
Archived
1
0
Fork 0
This repository has been archived on 2023-11-28. You can view files and clone it, but cannot push or open issues/pull-requests.
celeste-ai/celeste/celeste.py

235 lines
4.6 KiB
Python
Raw Normal View History

2023-02-15 22:24:40 -08:00
import subprocess
import time
import threading
import math
2023-02-15 23:38:27 -08:00
from tqdm import tqdm
2023-02-15 22:24:40 -08:00
class CelesteError(Exception):
pass
class Celeste:
action_space = [
"left", # move left
"right", # move right
"jump", # jump
"dash-u", # dash up
"dash-r", # dash right
"dash-l", # dash left
"dash-ru", # dash right-up
"dash-lu" # dash left-up
]
def __init__(self, on_get_state):
self.on_get_state = on_get_state
# Start pico-8
self.process = subprocess.Popen(
"bin/pico-8/linux/pico8",
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT
)
# Wait for window to open and get window id
time.sleep(2)
winid = subprocess.check_output([
"xdotool",
"search",
"--class",
"pico8"
]).decode("utf-8").strip().split("\n")
if len(winid) != 1:
raise Exception("Could not find unique PICO-8 window id")
self.winid = winid[0]
# Load cartridge
self.keystring("load hackcel.p8")
self.keypress("Enter")
self.keystring("run")
self.keypress("Enter", post = 1000)
# Initialize variables
self.internal_status = {}
# Score system
self.frame_counter = 0
self.next_point = 0
self.dist = 0 # distance to next point
self.target_points = [
[ # Stage 1
(28, 88), # Start pillar
(60, 80), # Middle pillar
(105, 64), # Right ledge
(25, 40), # Left ledge
(110, 16), # End ledge
(110, -2), # Next stage
]
]
def act(self, action):
self.keyup("x")
self.keyup("c")
self.keyup("Left")
self.keyup("Right")
self.keyup("Down")
self.keyup("Up")
if action is None:
return
elif action == "left":
self.keydown("Left")
elif action == "right":
self.keydown("Right")
elif action == "jump":
self.keydown("c")
elif action == "dash-u":
self.keydown("Up")
self.keydown("x")
elif action == "dash-r":
self.keydown("Right")
self.keydown("x")
elif action == "dash-l":
self.keydown("Left")
self.keydown("x")
elif action == "dash-ru":
self.keydown("Up")
self.keydown("Right")
self.keydown("x")
elif action == "dash-lu":
self.keydown("Up")
self.keydown("Left")
self.keydown("x")
@property
def status(self):
try:
return {
"stage": (
[
[0, 1, 2, 3, 4]
]
[int(self.internal_status["ry"])]
[int(self.internal_status["rx"])]
),
"xpos": int(self.internal_status["px"]),
"ypos": int(self.internal_status["py"]),
"xvel": float(self.internal_status["vx"]),
"yvel": float(self.internal_status["vy"]),
"deaths": int(self.internal_status["dc"]),
"dist": self.dist,
"next_point": self.next_point,
"frame_count": self.frame_counter
}
except KeyError:
raise CelesteError("Not enough data to get status.")
def keypress(self, key: str, *, post = 200):
subprocess.run([
"xdotool",
"key",
"--window", self.winid,
key
])
time.sleep(post / 1000)
def keydown(self, key: str):
subprocess.run([
"xdotool",
"keydown",
"--window", self.winid,
key
])
def keyup(self, key: str):
subprocess.run([
"xdotool",
"keyup",
"--window", self.winid,
key
])
def keystring(self, string, *, delay = 100, post = 200):
subprocess.run([
"xdotool",
"type",
"--window", self.winid,
"--delay", str(delay),
string
])
time.sleep(post / 1000)
def reset(self):
self.internal_status = {}
self.next_point = 0
self.frame_counter = 0
self.keypress("Escape")
self.keystring("run")
self.keypress("Enter", post = 1000)
2023-02-15 23:38:27 -08:00
self.flush_reader()
2023-02-15 22:24:40 -08:00
def flush_reader(self):
for k in iter(self.process.stdout.readline, ""):
k = k.decode("utf-8")[:-1]
if k == "!RESTART":
break
def update_loop(self):
# Get state, call callback, wait for state
# One line => one frame.
2023-02-15 23:38:27 -08:00
it = iter(self.process.stdout.readline, "")
for line in it:
2023-02-15 22:24:40 -08:00
l = line.decode("utf-8")[:-1].strip()
# This should only occur at game start
if l in ["!RESTART"]:
continue
self.frame_counter += 1
# Parse status string
for entry in l.split(";"):
if entry == "":
continue
key, val = entry.split(":")
self.internal_status[key] = val
# Update checkpoints
tx, ty = self.target_points[self.status["stage"]][self.next_point]
x = self.status["xpos"]
y = self.status["ypos"]
dist = math.sqrt(
(x-tx)*(x-tx) +
(y-ty)*(y-ty)
)
if dist <= 4 and y == ty:
2023-02-15 23:38:27 -08:00
print(f"Got point {self.next_point}")
2023-02-15 22:24:40 -08:00
self.next_point += 1
# Recalculate distance to new point
tx, ty = self.target_points[self.status["stage"]][self.next_point]
dist = math.sqrt(
(x-tx)*(x-tx) +
(y-ty)*(y-ty)
)
self.dist = dist
# Call step callback
self.on_get_state(self)