Added celeste files
parent
443fb9bef2
commit
93b7ef2e04
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,213 @@
|
|||
import subprocess
|
||||
import time
|
||||
import threading
|
||||
import math
|
||||
|
||||
class Celeste:
|
||||
|
||||
def __init__(self):
|
||||
|
||||
# Start process
|
||||
self.process = subprocess.Popen(
|
||||
"bin/pico-8/pico8",
|
||||
shell=True,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT
|
||||
)
|
||||
|
||||
# Wait for window to open and get window id
|
||||
time.sleep(2)
|
||||
winid = subprocess.check_output([
|
||||
"xdotool",
|
||||
"search",
|
||||
"--class",
|
||||
"pico8"
|
||||
]).decode("utf-8").strip().split("\n")
|
||||
if len(winid) != 1:
|
||||
raise Exception("Could not find unique PICO-8 window id")
|
||||
self.winid = winid[0]
|
||||
|
||||
# Load cartridge
|
||||
self.keystring("load hackcel.p8")
|
||||
self.keypress("Enter")
|
||||
self.keystring("run")
|
||||
self.keypress("Enter", post = 1000)
|
||||
|
||||
# Initialize variables
|
||||
self.internal_status = {}
|
||||
self.dead = False
|
||||
|
||||
# -1: left
|
||||
# 0: not moving
|
||||
# 1: moving right
|
||||
self.moving = 0
|
||||
|
||||
# Start state update thread
|
||||
self.update_thread = threading.Thread(target = self._update_loop)
|
||||
self.update_thread.start()
|
||||
|
||||
def act(self, action):
|
||||
self.keyup("x")
|
||||
self.keyup("c")
|
||||
self.keyup("Down")
|
||||
self.keyup("Up")
|
||||
if self.moving != -1:
|
||||
self.keyup("Left")
|
||||
if self.moving != 1:
|
||||
self.keyup("Right")
|
||||
|
||||
if action is None:
|
||||
self.moving = 0
|
||||
self.keyup("Left")
|
||||
self.keyup("Right")
|
||||
elif action == "left":
|
||||
if self.moving != -1:
|
||||
self.keydown("Left")
|
||||
self.moving = -1
|
||||
elif action == "right":
|
||||
if self.moving != 1:
|
||||
self.keydown("Right")
|
||||
self.moving = 1
|
||||
|
||||
|
||||
@property
|
||||
def status(self):
|
||||
return {
|
||||
"stage": (
|
||||
[
|
||||
[0, 1, 2, 3, 4]
|
||||
]
|
||||
[int(self.internal_status["ry"])]
|
||||
[int(self.internal_status["rx"])]
|
||||
),
|
||||
|
||||
"xpos": int(self.internal_status["px"]),
|
||||
"ypos": int(self.internal_status["py"]),
|
||||
"xvel": float(self.internal_status["vx"]),
|
||||
"yvel": float(self.internal_status["vy"])
|
||||
}
|
||||
|
||||
|
||||
|
||||
# Possible actions
|
||||
@property
|
||||
def action_space(self):
|
||||
return [
|
||||
"left", # move left
|
||||
"rght", # move right
|
||||
"jump", # jump
|
||||
|
||||
"dshn", # dash north
|
||||
"dshe", # dash east
|
||||
"dshw", # dash west
|
||||
"dsne", # dash north-east
|
||||
"dsnw" # dash north-west
|
||||
]
|
||||
|
||||
|
||||
def keypress(self, key: str, *, post = 200):
|
||||
subprocess.run([
|
||||
"xdotool",
|
||||
"key",
|
||||
"--window", self.winid,
|
||||
key
|
||||
])
|
||||
time.sleep(post / 1000)
|
||||
|
||||
def keydown(self, key: str):
|
||||
subprocess.run([
|
||||
"xdotool",
|
||||
"keydown",
|
||||
"--window", self.winid,
|
||||
key
|
||||
])
|
||||
|
||||
def keyup(self, key: str):
|
||||
subprocess.run([
|
||||
"xdotool",
|
||||
"keyup",
|
||||
"--window", self.winid,
|
||||
key
|
||||
])
|
||||
|
||||
def keystring(self, string, *, delay = 100, post = 200):
|
||||
subprocess.run([
|
||||
"xdotool",
|
||||
"type",
|
||||
"--window", self.winid,
|
||||
"--delay", str(delay),
|
||||
string
|
||||
])
|
||||
time.sleep(post / 1000)
|
||||
|
||||
def reset(self):
|
||||
self.internal_status = {}
|
||||
if not self.dead:
|
||||
self.keypress("Escape")
|
||||
self.keystring("run")
|
||||
self.keypress("Enter", post = 1000)
|
||||
self.dead = False
|
||||
|
||||
def _update_loop(self):
|
||||
# Poll process for new output until finished
|
||||
for line in iter(self.process.stdout.readline, ""):
|
||||
l = line.decode("utf-8")[:-1]
|
||||
|
||||
if l in ["!RESTART"]:
|
||||
continue
|
||||
|
||||
for entry in l.split(";"):
|
||||
key, val = entry.split(":")
|
||||
self.internal_status[key] = val
|
||||
|
||||
# Exit game on death
|
||||
if "dc" in self.internal_status and self.internal_status["dc"] != "0":
|
||||
self.keypress("Escape")
|
||||
self.dead = True
|
||||
|
||||
# Flush stream reader
|
||||
for k in iter(self.process.stdout.readline, ""):
|
||||
k = k.decode("utf-8")[:-1]
|
||||
if k == "!RESTART":
|
||||
break
|
||||
|
||||
|
||||
# Stage 1:
|
||||
|
||||
|
||||
next_point = 0
|
||||
target_points = [
|
||||
(28, 88), # Start pillar
|
||||
(60, 80), # Middle pillar
|
||||
(105, 64), # Right ledge
|
||||
(25, 40), # Left ledge
|
||||
(110, 16), # End ledge
|
||||
(110, -2), # Next stage
|
||||
]
|
||||
|
||||
# += 5
|
||||
|
||||
c = Celeste()
|
||||
while True:
|
||||
if c.dead:
|
||||
print("\n\nDead, resetting...")
|
||||
c.reset()
|
||||
|
||||
|
||||
tx, ty = target_points[next_point]
|
||||
x = c.status["xpos"]
|
||||
y = c.status["ypos"]
|
||||
|
||||
dist = math.sqrt(
|
||||
(x-tx)*(x-tx) +
|
||||
(y-ty)*(y-ty)
|
||||
)
|
||||
|
||||
if dist <= 4 and y == ty:
|
||||
next_point += 1
|
||||
|
||||
print(f"Target point: {next_point:02}, Dist: {dist:0.3}")
|
||||
|
||||
#print()
|
||||
#print(c.status)
|
||||
|
Reference in New Issue